diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index c6d6cf08..5d77881f 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -24,7 +24,6 @@ jobs: - '2.8' - '2.x-latest' python: - - '2.7' - '3.5' - '3.6' - '3.7' @@ -116,7 +115,6 @@ jobs: - '1.10' - '2.8' python: - - '2.7' - '3.10' steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index b1f6e082..7a0eae78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,47 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Reusable testing workflow for integration with tarantool artifacts (PR #192). +- Connection pool with master discovery (PR #211, #196). + + ConnectionPool is supported only for Python 3.7 or newer. + Authenticated user must be able to call `box.info` on instances. + + ConnectionPool updates information about each server state (RO/RW) + on initial connect and then asynchronously in separate threads. + Application retries must be written considering the asynchronous nature + of cluster state refresh. User does not need to use any synchronization + mechanisms in requests, it's all handled with ConnectionPool methods. + + ConnectionPool API is the same as a plain Connection API. + On each request, a connection is chosen to execute this request. + A connection is chosen based on a request mode: + * Mode.ANY chooses any instance. + * Mode.RW chooses an RW instance. + * Mode.RO chooses an RO instance. + * Mode.PREFER_RW chooses an RW instance, if possible, RO instance + otherwise. + * Mode.PREFER_RO chooses an RO instance, if possible, RW instance + otherwise. + All requests that are guaranteed to write (insert, replace, delete, + upsert, update) use RW mode by default. select uses ANY by default. You + can set the mode explicitly. call, eval, execute and ping requests + require to set the mode explicitly. + + Example: + ```python + pool = tarantool.ConnectionPool( + addrs=[ + {'host': '108.177.16.0', 'port': 3301}, + {'host': '108.177.16.0', 'port': 3302}, + ], + user='test', + password='test',) + + pool.call('some_write_procedure', arg, mode=tarantool.Mode.RW) + ``` ### Changed +- **Breaking**: drop Python 2 support (PR #207). - **Breaking**: change binary types encode/decode for Python 3 to support working with varbinary (PR #211, #105). With Python 2 the behavior of the connector remains the same. diff --git a/setup.py b/setup.py index 2f4746f0..c57573d5 100755 --- a/setup.py +++ b/setup.py @@ -84,5 +84,6 @@ def find_version(*file_paths): command_options=command_options, install_requires=[ 'msgpack>=0.4.0', - ] + ], + python_requires='>=3', ) diff --git a/tarantool/__init__.py b/tarantool/__init__.py index 8ae0c44c..8f7b6d07 100644 --- a/tarantool/__init__.py +++ b/tarantool/__init__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- # pylint: disable=C0301,W0105,W0401,W0614 +import sys + from tarantool.connection import Connection from tarantool.mesh_connection import MeshConnection from tarantool.const import ( @@ -76,3 +78,8 @@ def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None, __all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema', 'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning', 'SchemaError', 'dbapi'] + +# ConnectionPool is supported only for Python 3.7 or newer. +if sys.version_info.major >= 3 and sys.version_info.minor >= 7: + from tarantool.connection_pool import ConnectionPool, Mode + __all__.extend(['ConnectionPool', 'Mode']) diff --git a/tarantool/connection.py b/tarantool/connection.py index 0ff39b58..73574e46 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -8,6 +8,7 @@ import time import errno import socket +import abc import ctypes import ctypes.util @@ -76,8 +77,92 @@ ENCODING_DEFAULT, ) +# Based on https://realpython.com/python-interface/ +class ConnectionInterface(metaclass=abc.ABCMeta): + @classmethod + def __subclasshook__(cls, subclass): + return (hasattr(subclass, 'close') and + callable(subclass.close) and + hasattr(subclass, 'is_closed') and + callable(subclass.is_closed) and + hasattr(subclass, 'connect') and + callable(subclass.connect) and + hasattr(subclass, 'call') and + callable(subclass.call) and + hasattr(subclass, 'eval') and + callable(subclass.eval) and + hasattr(subclass, 'replace') and + callable(subclass.replace) and + hasattr(subclass, 'insert') and + callable(subclass.insert) and + hasattr(subclass, 'delete') and + callable(subclass.delete) and + hasattr(subclass, 'upsert') and + callable(subclass.upsert) and + hasattr(subclass, 'update') and + callable(subclass.update) and + hasattr(subclass, 'ping') and + callable(subclass.ping) and + hasattr(subclass, 'select') and + callable(subclass.select) and + hasattr(subclass, 'execute') and + callable(subclass.execute) or + NotImplemented) + + @abc.abstractmethod + def close(self): + raise NotImplementedError + + @abc.abstractmethod + def is_closed(self): + raise NotImplementedError + + @abc.abstractmethod + def connect(self): + raise NotImplementedError + + @abc.abstractmethod + def call(self, func_name, *args, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def eval(self, expr, *args, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def replace(self, space_name, values): + raise NotImplementedError + + @abc.abstractmethod + def insert(self, space_name, values): + raise NotImplementedError + + @abc.abstractmethod + def delete(self, space_name, key, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def upsert(self, space_name, tuple_value, op_list, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def update(self, space_name, key, op_list, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def ping(self, notime): + raise NotImplementedError + + @abc.abstractmethod + def select(self, space_name, key, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def execute(self, query, params, **kwargs): + raise NotImplementedError + -class Connection(object): +class Connection(ConnectionInterface): ''' Represents connection to the Tarantool server. diff --git a/tarantool/connection_pool.py b/tarantool/connection_pool.py new file mode 100644 index 00000000..176c29b8 --- /dev/null +++ b/tarantool/connection_pool.py @@ -0,0 +1,474 @@ +# -*- coding: utf-8 -*- + +import abc +import itertools +import queue +import threading +import time +import typing +from dataclasses import dataclass, field +from enum import Enum + +from tarantool.connection import Connection, ConnectionInterface +from tarantool.const import ( + CONNECTION_TIMEOUT, + POOL_INSTANCE_RECONNECT_DELAY, + POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS, + POOL_REFRESH_DELAY, + SOCKET_TIMEOUT +) +from tarantool.error import ( + ClusterConnectWarning, + PoolTolopogyError, + PoolTolopogyWarning, + ConfigurationError, + DatabaseError, + NetworkError, + NetworkWarning, + tnt_strerror, + warn +) +from tarantool.utils import ENCODING_DEFAULT +from tarantool.mesh_connection import validate_address + + +class Mode(Enum): + ANY = 1 + RW = 2 + RO = 3 + PREFER_RW = 4 + PREFER_RO = 5 + + +class Status(Enum): + HEALTHY = 1 + UNHEALTHY = 2 + + +@dataclass +class InstanceState(): + status: Status = Status.UNHEALTHY + ro: typing.Optional[bool] = None + + +def QueueFactory(): + return queue.Queue(maxsize=1) + + +@dataclass +class PoolUnit(): + addr: dict + conn: Connection + input_queue: queue.Queue = field(default_factory=QueueFactory) + output_queue: queue.Queue = field(default_factory=QueueFactory) + thread: typing.Optional[threading.Thread] = None + state: InstanceState = field(default_factory=InstanceState) + # request_processing_enabled is used to stop requests processing + # in background thread on close or destruction. + request_processing_enabled: bool = False + + +# Based on https://realpython.com/python-interface/ +class StrategyInterface(metaclass=abc.ABCMeta): + @classmethod + def __subclasshook__(cls, subclass): + return (hasattr(subclass, '__init__') and + callable(subclass.__init__) and + hasattr(subclass, 'update') and + callable(subclass.update) and + hasattr(subclass, 'getnext') and + callable(subclass.getnext) or + NotImplemented) + + @abc.abstractmethod + def __init__(self, pool): + raise NotImplementedError + + @abc.abstractmethod + def update(self): + raise NotImplementedError + + @abc.abstractmethod + def getnext(self, mode): + raise NotImplementedError + +class RoundRobinStrategy(StrategyInterface): + """ + Simple round-robin connection rotation + """ + def __init__(self, pool): + self.ANY_iter = None + self.RW_iter = None + self.RO_iter = None + self.pool = pool + self.rebuild_needed = True + + def build(self): + ANY_pool = [] + RW_pool = [] + RO_pool = [] + + for key in self.pool: + state = self.pool[key].state + + if state.status == Status.UNHEALTHY: + continue + + ANY_pool.append(key) + + if state.ro == False: + RW_pool.append(key) + else: + RO_pool.append(key) + + if len(ANY_pool) > 0: + self.ANY_iter = itertools.cycle(ANY_pool) + else: + self.ANY_iter = None + + if len(RW_pool) > 0: + self.RW_iter = itertools.cycle(RW_pool) + else: + self.RW_iter = None + + if len(RO_pool) > 0: + self.RO_iter = itertools.cycle(RO_pool) + else: + self.RO_iter = None + + self.rebuild_needed = False + + def update(self): + self.rebuild_needed = True + + def getnext(self, mode): + if self.rebuild_needed: + self.build() + + if mode == Mode.ANY: + if self.ANY_iter is not None: + return next(self.ANY_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + elif mode == Mode.RW: + if self.RW_iter is not None: + return next(self.RW_iter) + else: + raise PoolTolopogyError("Can't find healthy rw instance in pool") + elif mode == Mode.RO: + if self.RO_iter is not None: + return next(self.RO_iter) + else: + raise PoolTolopogyError("Can't find healthy ro instance in pool") + elif mode == Mode.PREFER_RO: + if self.RO_iter is not None: + return next(self.RO_iter) + elif self.RW_iter is not None: + return next(self.RW_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + elif mode == Mode.PREFER_RW: + if self.RW_iter is not None: + return next(self.RW_iter) + elif self.RO_iter is not None: + return next(self.RO_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + + +@dataclass +class PoolTask(): + method_name: str + args: tuple + kwargs: dict + + +class ConnectionPool(ConnectionInterface): + ''' + Represents pool of connections to the cluster of Tarantool servers. + + ConnectionPool API is the same as a plain Connection API. + On each request, a connection is chosen to execute this request. + Connection is selected based on request mode: + * Mode.ANY chooses any instance. + * Mode.RW chooses an RW instance. + * Mode.RO chooses an RO instance. + * Mode.PREFER_RW chooses an RW instance, if possible, RO instance + otherwise. + * Mode.PREFER_RO chooses an RO instance, if possible, RW instance + otherwise. + All requests that are guaranteed to write (insert, replace, delete, + upsert, update) use RW mode by default. select uses ANY by default. You + can set the mode explicitly. call, eval, execute and ping requests + require to set the mode explicitly. + ''' + def __init__(self, + addrs, + user=None, + password=None, + socket_timeout=SOCKET_TIMEOUT, + reconnect_max_attempts=POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS, + reconnect_delay=POOL_INSTANCE_RECONNECT_DELAY, + connect_now=True, + encoding=ENCODING_DEFAULT, + call_16=False, + connection_timeout=CONNECTION_TIMEOUT, + strategy_class=RoundRobinStrategy, + refresh_delay=POOL_REFRESH_DELAY): + ''' + Initialize connections to the cluster of servers. + + :param list addrs: List of {host: , port:} dictionaries, + describing server addresses. + :param int reconnect_max_attempts: Max attempts to reconnect + for each connection in the pool. Be careful with reconnect + parameters in ConnectionPool since every status refresh is + also a request with reconnection. Default is 0 (fail after + first attempt). + :param float reconnect_delay: Time between reconnect + attempts for each connection in the pool. Be careful with + reconnect parameters in ConnectionPool since every status + refresh is also a request with reconnection. Default is 0. + :param StrategyInterface strategy_class: Class for choosing + instance based on request mode. By default, round-robin + strategy is used. + :param int refresh_delay: Minimal time between RW/RO status + refreshes. + ''' + + if not isinstance(addrs, list) or len(addrs) == 0: + raise ConfigurationError("addrs must be non-empty list") + + # Verify addresses. + for addr in addrs: + ok, msg = validate_address(addr) + if not ok: + raise ConfigurationError(msg) + self.addrs = addrs + + # Create connections + self.pool = {} + self.refresh_delay = refresh_delay + self.strategy = strategy_class(self.pool) + + for addr in self.addrs: + key = self._make_key(addr) + self.pool[key] = PoolUnit( + addr=addr, + conn=Connection( + host=addr['host'], + port=addr['port'], + user=user, + password=password, + socket_timeout=socket_timeout, + reconnect_max_attempts=reconnect_max_attempts, + reconnect_delay=reconnect_delay, + connect_now=False, # Connect in ConnectionPool.connect() + encoding=encoding, + call_16=call_16, + connection_timeout=connection_timeout) + ) + + if connect_now: + self.connect() + + def __del__(self): + self.close() + + def _make_key(self, addr): + return '{0}:{1}'.format(addr['host'], addr['port']) + + def _get_new_state(self, unit): + conn = unit.conn + + if conn.is_closed(): + try: + conn.connect() + except NetworkError as e: + msg = "Failed to connect to {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, ClusterConnectWarning) + return InstanceState(Status.UNHEALTHY) + + try: + resp = conn.call('box.info') + except NetworkError as e: + msg = "Failed to get box.info for {0}:{1}, reason: {2}".format( + unit.addr['host'], unit.addr['port'], repr(e)) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + try: + ro = resp.data[0]['ro'] + except (IndexError, KeyError) as e: + msg = "Incorrect box.info response from {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + try: + status = resp.data[0]['status'] + + if status != 'running': + msg = "{0}:{1} instance status is not 'running'".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + except (IndexError, KeyError) as e: + msg = "Incorrect box.info response from {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + return InstanceState(Status.HEALTHY, ro) + + def _refresh_state(self, key): + unit = self.pool[key] + + state = self._get_new_state(unit) + if state != unit.state: + unit.state = state + self.strategy.update() + + def close(self): + for unit in self.pool.values(): + unit.request_processing_enabled = False + unit.thread.join() + + if not unit.conn.is_closed(): + unit.conn.close() + + def is_closed(self): + return all(unit.request_processing_enabled == False for unit in self.pool.values()) + + def _request_process_loop(self, key, unit, last_refresh): + while unit.request_processing_enabled: + if not unit.input_queue.empty(): + task = unit.input_queue.get() + method = getattr(Connection, task.method_name) + try: + resp = method(unit.conn, *task.args, **task.kwargs) + except Exception as e: + unit.output_queue.put(e) + else: + unit.output_queue.put(resp) + + now = time.time() + + if now - last_refresh > self.refresh_delay: + self._refresh_state(key) + last_refresh = time.time() + + def connect(self): + for key in self.pool: + unit = self.pool[key] + + self._refresh_state(key) + last_refresh = time.time() + + unit.thread = threading.Thread( + target=self._request_process_loop, + args=(key, unit, last_refresh), + daemon=True, + ) + unit.request_processing_enabled = True + unit.thread.start() + + def _send(self, mode, method_name, *args, **kwargs): + key = self.strategy.getnext(mode) + unit = self.pool[key] + + task = PoolTask(method_name=method_name, args=args, kwargs=kwargs) + + unit.input_queue.put(task) + resp = unit.output_queue.get() + + if isinstance(resp, Exception): + raise resp + + return resp + + def call(self, func_name, *args, mode=None): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'call', func_name, *args) + + def eval(self, expr, *args, mode=None): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'eval', expr, *args) + + def replace(self, space_name, values, *, mode=Mode.RW): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'replace', space_name, values) + + def insert(self, space_name, values, *, mode=Mode.RW): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'insert', space_name, values) + + def delete(self, space_name, key, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'delete', space_name, key, **kwargs) + + def upsert(self, space_name, tuple_value, op_list, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'upsert', space_name, tuple_value, + op_list, **kwargs) + + def update(self, space_name, key, op_list, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'update', space_name, key, + op_list, **kwargs) + + def ping(self, *, mode=None, **kwargs): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'ping', **kwargs) + + def select(self, space_name, key, *, mode=Mode.ANY, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is + ANY). + ''' + + return self._send(mode, 'select', space_name, key, **kwargs) + + def execute(self, query, params=None, *, mode=None): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'execute', query, params) diff --git a/tarantool/const.py b/tarantool/const.py index 0db35978..9eb834a8 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -96,3 +96,9 @@ RECONNECT_DELAY = 0.1 # Default cluster nodes list refresh interval (seconds) CLUSTER_DISCOVERY_DELAY = 60 +# Default cluster nodes state refresh interval (seconds) +POOL_REFRESH_DELAY = 1 +# Default maximum number of attempts to reconnect for pool instance +POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS = 0 +# Default delay between attempts to reconnect (seconds) +POOL_INSTANCE_RECONNECT_DELAY = 0 diff --git a/tarantool/error.py b/tarantool/error.py index 78519b68..c7165a9b 100644 --- a/tarantool/error.py +++ b/tarantool/error.py @@ -223,6 +223,21 @@ class ClusterDiscoveryWarning(UserWarning): pass +class ClusterConnectWarning(UserWarning): + '''Warning related to cluster pool connection''' + pass + + +class PoolTolopogyWarning(UserWarning): + '''Warning related to ro/rw cluster pool topology''' + pass + + +class PoolTolopogyError(DatabaseError): + '''Exception raised due to unsatisfying ro/rw cluster pool topology''' + pass + + # always print this warnings warnings.filterwarnings("always", category=NetworkWarning) diff --git a/test/suites/__init__.py b/test/suites/__init__.py index 8e2eafc1..1868ad42 100644 --- a/test/suites/__init__.py +++ b/test/suites/__init__.py @@ -10,6 +10,7 @@ from .test_protocol import TestSuite_Protocol from .test_reconnect import TestSuite_Reconnect from .test_mesh import TestSuite_Mesh +from .test_pool import TestSuite_Pool from .test_execute import TestSuite_Execute from .test_dbapi import TestSuite_DBAPI from .test_encoding import TestSuite_Encoding @@ -18,7 +19,7 @@ TestSuite_Schema_BinaryConnection, TestSuite_Request, TestSuite_Protocol, TestSuite_Reconnect, TestSuite_Mesh, TestSuite_Execute, TestSuite_DBAPI, - TestSuite_Encoding) + TestSuite_Encoding, TestSuite_Pool) def load_tests(loader, tests, pattern): suite = unittest.TestSuite() diff --git a/test/suites/lib/skip.py b/test/suites/lib/skip.py index f8f5a475..284b70b6 100644 --- a/test/suites/lib/skip.py +++ b/test/suites/lib/skip.py @@ -18,8 +18,18 @@ def wrapper(self, *args, **kwargs): func(self, *args, **kwargs) if not hasattr(self, 'tnt_version'): + srv = None + + if hasattr(self, 'servers'): + srv = self.servers[0] + + if hasattr(self, 'srv'): + srv = self.srv + + assert srv is not None + self.__class__.tnt_version = re.match( - r'[\d.]+', self.srv.admin('box.info.version')[0] + r'[\d.]+', srv.admin('box.info.version')[0] ).group() tnt_version = pkg_resources.parse_version(self.tnt_version) @@ -34,9 +44,8 @@ def wrapper(self, *args, **kwargs): return wrapper -def skip_or_run_test_python_major(func, REQUIRED_PYTHON_MAJOR, msg): - """Decorator to skip or run tests depending on the Python major - version. +def skip_or_run_test_python(func, REQUIRED_PYTHON_VERSION, msg): + """Decorator to skip or run tests depending on the Python version. Also, it can be used with the 'setUp' method for skipping the whole test suite. @@ -47,9 +56,12 @@ def wrapper(self, *args, **kwargs): if func.__name__ == 'setUp': func(self, *args, **kwargs) - major = sys.version_info.major - if major != REQUIRED_PYTHON_MAJOR: - self.skipTest('Python %s connector %s' % (major, msg)) + ver = sys.version_info + python_version_str = '%d.%d' % (ver.major, ver.minor) + python_version = pkg_resources.parse_version(python_version_str) + support_version = pkg_resources.parse_version(REQUIRED_PYTHON_VERSION) + if python_version < support_version: + self.skipTest('Python %s connector %s' % (python_version, msg)) if func.__name__ != 'setUp': func(self, *args, **kwargs) @@ -81,11 +93,11 @@ def skip_or_run_varbinary_test(func): 'does not support VARBINARY type') -def skip_or_run_mp_bin_test(func): - """Decorator to skip or run mp_bin-related tests depending on +def skip_or_run_conn_pool_test(func): + """Decorator to skip or run ConnectionPool tests depending on the Python version. - - Python 2 connector do not support mp_bin. """ - return skip_or_run_test_python_major(func, 3, 'does not support mp_bin') \ No newline at end of file + return skip_or_run_test_python(func, '3.7', + 'does not support ConnectionPool') + diff --git a/test/suites/test_encoding.py b/test/suites/test_encoding.py index 1ee0f4aa..f5f5f8c0 100644 --- a/test/suites/test_encoding.py +++ b/test/suites/test_encoding.py @@ -6,7 +6,7 @@ import unittest import tarantool -from .lib.skip import skip_or_run_mp_bin_test, skip_or_run_varbinary_test +from .lib.skip import skip_or_run_varbinary_test from .lib.tarantool_server import TarantoolServer class TestSuite_Encoding(unittest.TestCase): @@ -99,7 +99,6 @@ def test_01_02_string_decode_for_encoding_utf8_behavior(self): resp = self.con_encoding_utf8.eval("return box.space['%s']:get('%s')" % (space, data)) self.assertSequenceEqual(resp, [[data]]) - @skip_or_run_mp_bin_test @skip_or_run_varbinary_test def test_01_03_bytes_encode_for_encoding_utf8_behavior(self): data_id = 103 @@ -111,7 +110,6 @@ def test_01_03_bytes_encode_for_encoding_utf8_behavior(self): resp = self.con_encoding_utf8.select(space, [ data ], index='varbin') self.assertSequenceEqual(resp, [[data_id, data]]) - @skip_or_run_mp_bin_test @skip_or_run_varbinary_test def test_01_04_varbinary_decode_for_encoding_utf8_behavior(self): data_id = 104 @@ -162,7 +160,6 @@ def test_02_03_bytes_encode_for_encoding_none_behavior(self): resp = self.con_encoding_none.select(space, [data]) self.assertSequenceEqual(resp, [[data]]) - @skip_or_run_mp_bin_test @skip_or_run_varbinary_test def test_02_04_varbinary_decode_for_encoding_none_behavior(self): data_id = 204 diff --git a/test/suites/test_pool.py b/test/suites/test_pool.py new file mode 100644 index 00000000..12263d79 --- /dev/null +++ b/test/suites/test_pool.py @@ -0,0 +1,539 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function + +import sys +import time +import unittest +import warnings + +import tarantool +from tarantool.error import PoolTolopogyError, DatabaseError, NetworkError + +from .lib.skip import skip_or_run_sql_test, skip_or_run_conn_pool_test +from .lib.tarantool_server import TarantoolServer + + +def create_server(_id): + srv = TarantoolServer() + srv.script = 'test/suites/box.lua' + srv.start() + srv.admin("box.schema.user.create('test', {password = 'test', " + + "if_not_exists = true})") + srv.admin("box.schema.user.grant('test', 'execute', 'universe')") + srv.admin("box.schema.space.create('test')") + srv.admin(r"box.space.test:format({" + +r" { name = 'pk', type = 'string' }," + + r" { name = 'id', type = 'number', is_nullable = true }" + + r"})") + srv.admin(r"box.space.test:create_index('pk'," + + r"{ unique = true," + + r" parts = {{field = 1, type = 'string'}}})") + srv.admin(r"box.space.test:create_index('id'," + + r"{ unique = true," + + r" parts = {{field = 2, type = 'number', is_nullable=true}}})") + srv.admin("box.schema.user.grant('test', 'read,write', 'space', 'test')") + srv.admin("json = require('json')") + + # Create srv_id function (for testing purposes). + srv.admin("function srv_id() return %s end" % _id) + return srv + + +@unittest.skipIf(sys.platform.startswith("win"), + 'Pool tests on windows platform are not supported') +class TestSuite_Pool(unittest.TestCase): + def set_ro(self, srv, read_only): + if read_only: + req = r'box.cfg{read_only = true}' + else: + req = r'box.cfg{read_only = false}' + + srv.admin(req) + + def set_cluster_ro(self, read_only_list): + assert len(self.servers) == len(read_only_list) + + for i in range(len(self.servers)): + self.set_ro(self.servers[i], read_only_list[i]) + + def retry(self, func, count=5, timeout=0.5): + for i in range(count): + try: + func() + except Exception as e: + if i + 1 == count: + raise e + + time.sleep(timeout) + + @classmethod + def setUpClass(self): + print(' POOL '.center(70, '='), file=sys.stderr) + print('-' * 70, file=sys.stderr) + + @skip_or_run_conn_pool_test + def setUp(self): + # Create five servers and extract helpful fields for tests. + self.servers = [] + self.addrs = [] + self.servers_count = 5 + for i in range(self.servers_count): + srv = create_server(i) + self.servers.append(srv) + self.addrs.append({'host': srv.host, 'port': srv.args['primary']}) + + def test_00_basic(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.RW), + [False]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.RO), + [True]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RW), + [False]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RO), + [True]) + + def test_01_roundrobin(self): + self.set_cluster_ro([False, False, True, False, True]) + RW_ports = set([str(self.addrs[0]['port']), str(self.addrs[1]['port']), str(self.addrs[3]['port'])]) + RO_ports = set([str(self.addrs[2]['port']), str(self.addrs[4]['port'])]) + all_ports = set() + for addr in self.addrs: + all_ports.add(str(addr['port'])) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + def get_port(self, mode): + resp = self.pool.eval('return box.cfg.listen', mode=mode) + self.assertIsInstance(resp.data[0], str) + return resp.data[0] + + # Expect ANY iterate through all instances. + ANY_ports_result = set() + for i in range(len(self.servers)): + ANY_ports_result.add(get_port(self, tarantool.Mode.ANY)) + + self.assertSetEqual(ANY_ports_result, all_ports) + + # Expect RW iterate through all RW instances. + RW_ports_result = set() + for i in range(len(self.servers)): + RW_ports_result.add(get_port(self, tarantool.Mode.RW)) + + self.assertSetEqual(RW_ports_result, RW_ports) + + # Expect RO iterate through all RO instances. + RO_ports_result = set() + for i in range(len(self.servers)): + RO_ports_result.add(get_port(self, tarantool.Mode.RO)) + + self.assertSetEqual(RO_ports_result, RO_ports) + + # Expect PREFER_RW iterate through all RW instances if there is at least one. + PREFER_RW_ports_result = set() + for i in range(len(self.servers)): + PREFER_RW_ports_result.add(get_port(self, tarantool.Mode.PREFER_RW)) + + self.assertSetEqual(PREFER_RW_ports_result, RW_ports) + + # Expect PREFER_RO iterate through all RO instances if there is at least one. + PREFER_RO_ports_result = set() + for i in range(len(self.servers)): + PREFER_RO_ports_result.add(get_port(self, tarantool.Mode.PREFER_RO)) + + self.assertSetEqual(PREFER_RO_ports_result, RO_ports) + + # Setup cluster with no RW. + self.set_cluster_ro([True, True, True, True, True]) + + # Expect RW to fail if there are no RW. + def expect_RW_to_fail_if_there_are_no_RW(): + with self.assertRaises(PoolTolopogyError): + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW) + + self.retry(func=expect_RW_to_fail_if_there_are_no_RW) + + # Expect PREFER_RW iterate through all instances if there are no RW. + def expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW(): + PREFER_RW_ports_result_all_ro = set() + for i in range(len(self.servers)): + PREFER_RW_ports_result_all_ro.add(get_port(self, tarantool.Mode.PREFER_RW)) + + self.assertSetEqual(PREFER_RW_ports_result_all_ro, all_ports) + + self.retry(func=expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW) + + # Setup cluster with no RO. + self.set_cluster_ro([False, False, False, False, False]) + + # Expect RO to fail if there are no RO. + def expect_RO_to_fail_if_there_are_no_RO(): + with self.assertRaises(PoolTolopogyError): + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RO) + + self.retry(func=expect_RO_to_fail_if_there_are_no_RO) + + # Expect PREFER_RO iterate through all instances if there are no RO. + def expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO(): + PREFER_RO_ports_result_all_rw = set() + for i in range(len(self.servers)): + PREFER_RO_ports_result_all_rw.add(get_port(self, tarantool.Mode.PREFER_RO)) + + self.assertSetEqual(PREFER_RO_ports_result_all_rw, all_ports) + + self.retry(func=expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO) + + def test_02_exception_raise(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaises(DatabaseError): + self.pool.call('non_existing_procedure', mode=tarantool.Mode.ANY) + + def test_03_insert(self): + self.set_cluster_ro([True, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.insert('test', ['test_03_insert_1', 1]), + [['test_03_insert_1', 1]]) + self.assertSequenceEqual( + self.pool.insert('test', ['test_03_insert_2', 2], + mode=tarantool.Mode.RW), + [['test_03_insert_2', 2]]) + + conn_2 = tarantool.connect( + host=self.addrs[2]['host'], + port=self.addrs[2]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + conn_2.select('test', 'test_03_insert_1'), + [['test_03_insert_1', 1]]) + + def test_04_delete(self): + self.set_cluster_ro([True, True, True, False, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_3 = tarantool.connect( + host=self.addrs[3]['host'], + port=self.addrs[3]['port'], + user='test', + password='test') + + conn_3.insert('test', ['test_04_delete_1', 1]) + conn_3.insert('test', ['test_04_delete_2', 2]) + + self.assertSequenceEqual( + self.pool.delete('test', 'test_04_delete_1'), + [['test_04_delete_1', 1]]) + self.assertSequenceEqual( + conn_3.select('test', 'test_04_delete_1'), + []) + + self.assertSequenceEqual( + self.pool.delete('test', 2, index='id', mode=tarantool.Mode.RW), + [['test_04_delete_2', 2]]) + self.assertSequenceEqual( + conn_3.select('test', 'test_04_delete_2'), + []) + + def test_05_upsert(self): + self.set_cluster_ro([True, False, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_1 = tarantool.connect( + host=self.addrs[1]['host'], + port=self.addrs[1]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.upsert('test', ['test_05_upsert', 3], [('+', 1, 1)]), + []) + self.assertSequenceEqual( + conn_1.select('test', 'test_05_upsert'), + [['test_05_upsert', 3]]) + + self.assertSequenceEqual( + self.pool.upsert('test', ['test_05_upsert', 3], + [('+', 1, 1)], mode=tarantool.Mode.RW), []) + self.assertSequenceEqual( + conn_1.select('test', 'test_05_upsert'), + [['test_05_upsert', 4]]) + + def test_06_update(self): + self.set_cluster_ro([True, True, True, True, False]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_4 = tarantool.connect( + host=self.addrs[4]['host'], + port=self.addrs[4]['port'], + user='test', + password='test') + conn_4.insert('test', ['test_06_update_1', 3]) + conn_4.insert('test', ['test_06_update_2', 14]) + + self.assertSequenceEqual( + self.pool.update('test', ('test_06_update_1',), [('+', 1, 1)]), + [['test_06_update_1', 4]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_06_update_1'), + [['test_06_update_1', 4]]) + + self.assertSequenceEqual( + self.pool.update('test', ('test_06_update_2',), + [('=', 1, 10)], mode=tarantool.Mode.RW), + [['test_06_update_2', 10]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_06_update_2'), + [['test_06_update_2', 10]]) + + def test_07_replace(self): + self.set_cluster_ro([True, True, True, True, False]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_4 = tarantool.connect( + host=self.addrs[4]['host'], + port=self.addrs[4]['port'], + user='test', + password='test') + conn_4.insert('test', ['test_07_replace', 3]) + + self.assertSequenceEqual( + self.pool.replace('test', ['test_07_replace', 4], + mode=tarantool.Mode.RW), + [['test_07_replace', 4]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_07_replace'), + [['test_07_replace', 4]]) + + self.assertSequenceEqual( + self.pool.replace('test', ['test_07_replace', 5]), + [['test_07_replace', 5]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_07_replace'), + [['test_07_replace', 5]]) + + def test_08_select(self): + self.set_cluster_ro([False, False, False, False, False]) + + for addr in self.addrs: + conn = tarantool.connect( + host=addr['host'], + port=addr['port'], + user='test', + password='test') + conn.insert('test', ['test_08_select', 3]) + + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.select('test', 'test_08_select'), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', ['test_08_select'], + mode=tarantool.Mode.ANY), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', 3, index='id', + mode=tarantool.Mode.RO), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', [3], index='id', + mode=tarantool.Mode.PREFER_RW), + [['test_08_select', 3]]) + + def test_09_ping(self): + self.pool = tarantool.ConnectionPool(addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.ping() + + self.assertTrue(self.pool.ping(mode=tarantool.Mode.ANY) > 0) + self.assertEqual(self.pool.ping(mode=tarantool.Mode.RW, notime=True), "Success") + + def test_10_call(self): + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.call('box.info') + + self.assertEqual( + self.pool.call('box.info', mode=tarantool.Mode.RW)[0]['ro'], + False) + + self.assertSequenceEqual( + self.pool.call('json.encode', {'test_10_call': 1}, mode=tarantool.Mode.ANY), + ['{"test_10_call":1}']) + + def test_11_eval(self): + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.eval('return box.info()') + + self.assertEqual( + self.pool.eval('return box.info()', mode=tarantool.Mode.RW)[0]['ro'], + False) + + self.assertSequenceEqual( + self.pool.eval('return json.encode(...)', {'test_11_eval': 1}, mode=tarantool.Mode.ANY), + ['{"test_11_eval":1}']) + + @skip_or_run_sql_test + def test_12_execute(self): + self.set_cluster_ro([False, True, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.execute("insert into \"test\" values ('test_12_execute_1', 1)") + + resp = self.pool.execute( + "insert into \"test\" values ('test_12_execute_1', 1)", + mode=tarantool.Mode.RW) + + self.assertEqual(resp.affected_row_count, 1) + self.assertEqual(resp.data, None) + + resp = self.pool.execute( + 'insert into "test" values (:pk, :id)', + { 'pk': 'test_12_execute_2', 'id': 2}, + mode=tarantool.Mode.RW) + self.assertEqual(resp.affected_row_count, 1) + self.assertEqual(resp.data, None) + + conn_0 = tarantool.connect( + host=self.addrs[0]['host'], + port=self.addrs[0]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + conn_0.select('test', 'test_12_execute_1'), + [['test_12_execute_1', 1]]) + self.assertSequenceEqual( + conn_0.select('test', 'test_12_execute_2'), + [['test_12_execute_2', 2]]) + + def test_13_failover(self): + self.set_cluster_ro([False, True, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + # Simulate failover + self.servers[0].stop() + self.set_ro(self.servers[1], False) + + def expect_RW_request_execute_on_new_master(): + self.assertSequenceEqual( + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW), + [ str(self.addrs[1]['port']) ]) + + self.retry(func=expect_RW_request_execute_on_new_master) + + def test_14_cluster_with_instances_dead_in_runtime_is_ok(self): + self.set_cluster_ro([False, True, False, True, True]) + self.servers[0].stop() + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + self.pool.ping(mode=tarantool.Mode.RW) + + def test_15_cluster_with_dead_instances_on_start_is_ok(self): + self.set_cluster_ro([False, True, True, True, True]) + self.servers[0].stop() + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + self.servers[0].start() + + def ping_RW(): + self.pool.ping(mode=tarantool.Mode.RW) + + self.retry(func=ping_RW) + + def test_16_is_closed(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test',) + + self.assertEquals(self.pool.is_closed(), False) + + self.pool.close() + + self.assertEquals(self.pool.is_closed(), True) + + def tearDown(self): + if hasattr(self, 'pool'): + self.pool.close() + + for srv in self.servers: + srv.stop() + srv.clean()