diff --git a/README.rst b/README.rst index ffe4943..17ff8a7 100644 --- a/README.rst +++ b/README.rst @@ -40,6 +40,7 @@ Testing Set the following environment variables with your own values before running the unit tests:: + make -C tcpproxy export MOMOKO_TEST_DB='your_db' export MOMOKO_TEST_USER='your_user' export MOMOKO_TEST_PASSWORD='your_password' diff --git a/docs/installation.rst b/docs/installation.rst index ee4e039..aaf2a40 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -22,7 +22,7 @@ can also be used by setting the ``MOMOKO_PSYCOPG2_IMPL`` environment variable to # 'psycopg2' or 'psycopg2cffi' export MOMOKO_PSYCOPG2_IMPL='psycopg2cffi' -The unit tests als use this variable. It needs to be set if something else is used +The unit tests all use this variable. It needs to be set if something else is used instead of Psycopg2 when running the unit tests. Besides ``MOMOKO_PSYCOPG2_IMPL`` there are also other variables that need to be set for the unit tests. @@ -37,11 +37,17 @@ Here's an example for the environment variables:: # Set to '0' if hstore extension isn't enabled export MOMOKO_TEST_HSTORE='1' # Default: 0 -And running the tests is easy:: +Momoko tests use tcproxy_ for simulating Postgres server unavailablity. The copy +of tcproxy is bundled with Momoko, but you need to build it first:: + + make -C tcproxy + +Finally, running the tests is easy:: python setup.py test +.. _tcproxy: https://github.com/dccmx/tcproxy .. _psycopg2cffi: http://pypi.python.org/pypi/psycopg2cffi .. _Tornado: http://www.tornadoweb.org/ .. _Psycopg2: http://initd.org/psycopg/ diff --git a/momoko/connection.py b/momoko/connection.py index 6598eb4..2fe25f2 100644 --- a/momoko/connection.py +++ b/momoko/connection.py @@ -173,7 +173,7 @@ class Pool(object): :param list setsession: List of intial sql commands to be executed once connection is established. - If any of the commands failes, the connection will be closed. + If any of the commands fails, the connection will be closed. **NOTE:** The commands will be executed as one transaction block. :param bool auto_shrink: @@ -195,6 +195,10 @@ class Pool(object): .. _psycopg2.extensions.connection: http://initd.org/psycopg/docs/connection.html#connection .. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor """ + + class DatabaseNotAvailable(psycopg2.DatabaseError): + """Raised when Pool can not connect to database server""" + def __init__(self, dsn, connection_factory=None, @@ -232,7 +236,7 @@ def __init__(self, self.conns = ConnectionContainer() self._last_connect_time = 0 - self._no_conn_availble_error = psycopg2.DatabaseError("No database connection available") + self._no_conn_availble_error = self.DatabaseNotAvailable("No database connection available") self.shrink_period = shrink_period self.shrink_delay = shrink_delay self.auto_shrink = auto_shrink @@ -440,7 +444,7 @@ def when_available(fut): conn = fut.result() except psycopg2.Error as error: future.set_exc_info(sys.exc_info()) - if retry: + if retry and not keep: self.putconn(retry[0]) return @@ -448,28 +452,27 @@ def when_available(fut): try: future_or_result = method(conn, *args, **kwargs) except psycopg2.Error as error: - if conn.closed: - if not retry: - retry.append(conn) - self.ioloop.add_future(conn.connect(), when_available) - return - else: - future.set_exception(self._no_conn_availble_error) - else: - future.set_exc_info(sys.exc_info()) - log.debug(2) - self.putconn(conn) - return + log.debug("Method failed synchronously") + return self._retry(retry, when_available, conn, keep, future) if not async: + if not keep: + self.putconn(conn) future.set_result(future_or_result) - log.debug(3) - self.putconn(conn) return - chain_future(future_or_result, future) - if not keep: - future.add_done_callback(lambda f: self.putconn(conn)) + def when_done(rfut): + try: + result = rfut.result() + except psycopg2.Error as error: + log.debug("Method failed Asynchronously") + return self._retry(retry, when_available, conn, keep, future) + + if not keep: + self.putconn(conn) + future.set_result(result) + + self.ioloop.add_future(future_or_result, when_done) if not connection: self.ioloop.add_future(self.getconn(ping=False), when_available) @@ -479,6 +482,20 @@ def when_available(fut): when_available(f) return future + def _retry(self, retry, what, conn, keep, future): + if conn.closed: + if not retry: + retry.append(conn) + self.ioloop.add_future(conn.connect(), what) + return + else: + future.set_exception(self._no_conn_availble_error) + else: + future.set_exc_info(sys.exc_info()) + if not keep: + self.putconn(conn) + return + def _reanimate(self): assert self.conns.dead, "BUG: don't call reanimate when there is no one to reanimate" @@ -601,7 +618,7 @@ class Connection(object): :param list setsession: List of intial sql commands to be executed once connection is established. - If any of the commands failes, the connection will be closed. + If any of the commands fails, the connection will be closed. **NOTE:** The commands will be executed as one transaction block. .. _Data Source Name: http://en.wikipedia.org/wiki/Data_Source_Name @@ -639,6 +656,7 @@ def connect(self): try: self.connection = psycopg2.connect(self.dsn, **kwargs) except psycopg2.Error as error: + self.connection = None future.set_exc_info(sys.exc_info()) return future @@ -657,6 +675,7 @@ def on_connect(on_connect_future): self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE) self.ioloop.add_future(future, self._set_server_version) + self.ioloop.add_future(future, self._close_on_fail) return future @@ -665,6 +684,12 @@ def _set_server_version(self, future): return self.server_version = self.connection.server_version + def _close_on_fail(self, future): + # If connection attempt evetually fails - marks connection as closed by ourselves + # since psycopg2 does not do that for us (on connection attempts) + if future.exception(): + self.connection = None + def _io_callback(self, future, result, fd=None, events=None): try: state = self.connection.poll() @@ -680,7 +705,7 @@ def _io_callback(self, future, result, fd=None, events=None): elif state == POLL_WRITE: self.ioloop.update_handler(self.fileno, IOLoop.WRITE) else: - future.set_exception(psycopg2.OperationalError('poll() returned {0}'.format(state))) + future.set_exception(psycopg2.OperationalError("poll() returned %s" % state)) def ping(self): """ @@ -800,7 +825,7 @@ def transaction(self, The class returned must be a subclass of `psycopg2.extensions.cursor`_. See `Connection and cursor factories`_ for details. Defaults to ``None``. :param bool auto_rollback: - If one of the transaction statements failes, try to automatically + If one of the transaction statements fails, try to automatically execute ROLLBACK to abort the transaction. If ROLLBACK fails, it would not be raised, but only logged. diff --git a/tests.py b/tests.py index f1a36ff..39a3773 100644 --- a/tests.py +++ b/tests.py @@ -10,7 +10,9 @@ import logging import datetime import threading +import socket from tornado.concurrent import Future +import subprocess from tornado import gen from tornado.testing import unittest, AsyncTestCase, gen_test @@ -27,14 +29,19 @@ db_password = os.environ.get('MOMOKO_TEST_PASSWORD', '') db_host = os.environ.get('MOMOKO_TEST_HOST', '') db_port = os.environ.get('MOMOKO_TEST_PORT', 5432) +db_proxy_port = os.environ.get('MOMOKO_TEST_PROXY_PORT', 15432) test_hstore = True if os.environ.get('MOMOKO_TEST_HSTORE', False) == '1' else False good_dsn = 'dbname=%s user=%s password=%s host=%s port=%s' % ( db_database, db_user, db_password, db_host, db_port) +good_proxy_dsn = 'dbname=%s user=%s password=%s host=%s port=%s' % ( + db_database, db_user, db_password, db_host, db_proxy_port) bad_dsn = 'dbname=%s user=%s password=xx%s host=%s port=%s' % ( 'db', 'user', 'password', "127.0.0.127", 11111) local_bad_dsn = 'dbname=%s user=%s password=xx%s' % ( 'db', 'user', 'password') +TCPPROXY_PATH = os.environ.get('MOMOKO_TCPPROXY_PATH', "./tcproxy/src/tcproxy") + assert (db_database or db_user or db_password or db_host or db_port) is not None, ( 'Environment variables for the unit tests are not set. Please set the following ' 'variables: MOMOKO_TEST_DB, MOMOKO_TEST_USER, MOMOKO_TEST_PASSWORD, ' @@ -61,6 +68,7 @@ class BaseTest(AsyncTestCase): dsn = good_dsn + good_dsn = good_dsn # This is a hack to overcome lack of "yield from" in Python < 3.3. # The goal is to support several set_up methods in inheriatnace chain @@ -342,6 +350,8 @@ def test_ping_with_named_cursor(self): """Test whether Connection.ping works fine with named cursors. Issue #74""" conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, cursor_factory=RealDictCursor) yield conn.ping() + + # # Pool tests # @@ -388,6 +398,13 @@ def close_connections(self, db, amount=None): conn.close() amount -= 1 + shutter = close_connections + + def total_close(self, db): + self.shutter(db) + for conn in db.conns.busy.union(db.conns.free).union(db.conns.dead): + conn.dsn = bad_dsn + @gen_test def run_and_check_query(self, db): cursor = yield db.execute("SELECT 6, 19, 24;") @@ -406,6 +423,34 @@ class PoolBaseDataTest(PoolBaseTest, BaseDataTest): pass +class ProxyMixIn(object): + dsn = good_proxy_dsn + good_dsn = good_proxy_dsn + + def start_proxy(self): + # Dirty way to make sure there are no proxies leftovers + subprocess.call(("killall", TCPPROXY_PATH), stderr=open("/dev/null", "w")) + + proxy_conf = "127.0.0.1:%s -> %s:%s" % (db_proxy_port, db_host, db_port) + self.proxy = subprocess.Popen((TCPPROXY_PATH, proxy_conf,)) + time.sleep(0.1) + + def terminate_proxy(self): + self.proxy.terminate() + + def kill_connections(self, db, amount=None): + self.terminate_proxy() + self.start_proxy() + + def set_up_00(self): + self.start_proxy() + + def tear_down_00(self): + self.terminate_proxy() + + shutter = kill_connections + + class MomokoPoolTest(PoolBaseTest): @gen_test def test_connect(self): @@ -462,11 +507,11 @@ def test_mogrify_error(self): @gen_test def test_transaction_with_reconnect(self): - """Test whether transaction works after reconnect""" + """Test whether transaction works after connections were closed""" # Added result counting, since there was a bug in retry mechanism that caused # double-execution of query after reconnect - self.close_connections(self.db) + self.shutter(self.db) yield self.db.transaction(("INSERT INTO unit_test_int_table VALUES (1)",)) cursor = yield self.db.execute("SELECT COUNT(1) FROM unit_test_int_table") self.assertEqual(cursor.fetchall(), [(1,)]) @@ -484,10 +529,10 @@ def test_getconn_putconn(self): @gen_test def test_getconn_putconn_with_reconnect(self): - """Testing getconn/putconn functionality with reconnect""" + """Testing getconn/putconn functionality with reconnect after closing connections""" for i in range(self.pool_size * 5): # Run many times to check that connections get recycled properly - self.close_connections(self.db) + self.shutter(self.db) conn = yield self.db.getconn() for j in range(10): cursor = yield conn.execute("SELECT %s", (j,)) @@ -496,9 +541,21 @@ def test_getconn_putconn_with_reconnect(self): @gen_test def test_getconn_manage(self): - """Testing getcontest_getconn_putconn_with_reconnectn + context manager functionality""" + """Testing getcontest_getconn_putconn + context manager functionality""" + for i in range(self.pool_size * 5): + # Run many times to check that connections get recycled properly + conn = yield self.db.getconn() + with self.db.manage(conn): + for j in range(10): + cursor = yield conn.execute("SELECT %s", (j,)) + self.assertEqual(cursor.fetchall(), [(j, )]) + + @gen_test + def test_getconn_manage_with_reconnect(self): + """Testing getcontest_getconn_putconn_with_reconnect + context manager functionality""" for i in range(self.pool_size * 5): # Run many times to check that connections get recycled properly + self.shutter(self.db) conn = yield self.db.getconn() with self.db.manage(conn): for j in range(10): @@ -508,7 +565,7 @@ def test_getconn_manage(self): @gen_test def test_getconn_manage_with_exception(self): """Testing getconn + context manager functionality + deliberate exception""" - self.close_connections(self.db) + self.shutter(self.db) conn = yield self.db.getconn(ping=False) with self.db.manage(conn): try: @@ -518,6 +575,10 @@ def test_getconn_manage_with_exception(self): self.assertEqual(len(self.db.conns.busy), 0, msg="Some connections were not recycled") +class MomokoPoolDataTestProxy(ProxyMixIn, MomokoPoolDataTest): + pass + + class MomokoPoolServerSideCursorTest(PoolBaseDataTest): @gen_test def test_server_side_cursor(self): @@ -601,15 +662,19 @@ def test_request_queueing(self): def test_parallel_queries_after_reconnect_all(self): """Testing that pool still queries database in parallel after ALL connections were closeded""" - self.close_connections(self.db) + self.shutter(self.db) self.run_parallel_queries() def test_parallel_queries_after_reconnect_some(self): """Testing that pool still queries database in parallel after SOME connections were closed""" - self.close_connections(self.db, amount=self.pool_size/2) + self.shutter(self.db, amount=self.pool_size/2) self.run_parallel_queries() +class MomokoPoolParallelTestProxy(ProxyMixIn, MomokoPoolParallelTest): + pass + + class MomokoPoolStretchTest(MomokoPoolParallelTest): pool_size = 1 max_size = 5 @@ -629,12 +694,12 @@ def test_dont_stretch(self): def test_dont_stretch_after_reconnect(self): """Testing that reconnecting dead connection does not trigger pool stretch""" - self.close_connections(self.db) + self.shutter(self.db) self.test_dont_stretch() def test_stretch_after_disonnect(self): """Testing that stretch works after disconnect""" - self.close_connections(self.db) + self.shutter(self.db) self.test_parallel_queries() @gen_test @@ -669,6 +734,10 @@ def test_stretch_genconn(self): self.assertEqual(self.db.conns.total, 3) +class MomokoPoolStretchTestProxy(ProxyMixIn, MomokoPoolStretchTest): + pass + + class MomokoPoolVolatileDbTest(PoolBaseTest): pool_size = 3 @@ -686,8 +755,9 @@ def test_startup_local(self): def test_reconnect(self): """Testing if we can reconnect if connections dies""" - db = self.build_pool_sync(dsn=good_dsn) - self.close_connections(db) + db = self.build_pool_sync(dsn=self.good_dsn) + log.debug("Killing connections") + self.shutter(db) self.run_and_check_query(db) def test_reconnect_interval_good_path(self): @@ -710,21 +780,26 @@ def test_reconnect_interval_bad_path(self): except psycopg2.DatabaseError: pass + @gen_test + def test_ping_error(self): + """Test that getconn uses ping properly to detect database unavailablity""" + db = yield self.build_pool(dsn=self.good_dsn, size=3) + self.total_close(db) + try: + yield db.getconn() + except db.DatabaseNotAvailable as err: + pass + @gen_test def test_abort_waiting_queue(self): """Testing that waiting queue is aborted properly when all connections are dead""" - db = yield self.build_pool(dsn=good_dsn, size=1) + db = yield self.build_pool(dsn=self.good_dsn, size=1) f1 = db.execute("SELECT 1") f2 = db.execute("SELECT 1") self.assertEqual(len(db.conns.waiting_queue), 1) - def total_close(f): - self.close_connections(db) - for conn in db.conns.dead: - conn.dsn = bad_dsn - - f1.add_done_callback(total_close) + f1.add_done_callback(lambda f: self.total_close(db)) try: yield [f1, f2] @@ -733,6 +808,10 @@ def total_close(f): self.assertEqual(len(db.conns.waiting_queue), 0) +class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest): + pass + + class MomokoPoolPartiallyConnectedTest(PoolBaseTest): raise_connect_errors = True pool_size = 3