From 150751a4abfae94bf05338fea65f9c672542212b Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Thu, 17 Mar 2022 16:32:08 +0300 Subject: [PATCH] connection_pool: introduce connection pool Introduce ConnectionPool class to work with cluster of Tarantool instances. ConnectionPool support master discovery and ro/rw-based requests, so it is most useful while working with a single replicaset of instances. ConnectionPool is supported only for Python 3.7 or newer. Authenticated user must be able to call `box.info` on instances. ConnectionPool updates information about each server state (RO/RW) on initial connect and then asynchronously in separate threads. Application retries must be written considering the asynchronous nature of cluster state refresh. User does not need to use any synchronization mechanisms in requests, it's all handled with ConnectionPool methods. ConnectionPool API is the same as a plain Connection API. On each request, a connection is chosen to execute this request. A connection is chosen based on a request mode: * Mode.ANY chooses any instance. * Mode.RW chooses an RW instance. * Mode.RO chooses an RO instance. * Mode.PREFER_RW chooses an RW instance, if possible, RO instance otherwise. * Mode.PREFER_RO chooses an RO instance, if possible, RW instance otherwise. All requests that are guaranteed to write (insert, replace, delete, upsert, update) use RW mode by default. select uses ANY by default. You can set the mode explicitly. call, eval, execute and ping requests require to set the mode explicitly. Example: pool = tarantool.ConnectionPool( addrs=[ {'host': '108.177.16.0', 'port': 3301}, {'host': '108.177.16.0', 'port': 3302}, ], user='test', password='test',) pool.call('some_write_procedure', arg, mode=tarantool.Mode.RW) Closes #196 --- CHANGELOG.md | 38 +++ tarantool/__init__.py | 5 + tarantool/connection_pool.py | 468 ++++++++++++++++++++++++++++++ tarantool/const.py | 6 + tarantool/error.py | 15 + test/suites/__init__.py | 3 +- test/suites/lib/skip.py | 35 ++- test/suites/test_pool.py | 539 +++++++++++++++++++++++++++++++++++ 8 files changed, 1101 insertions(+), 8 deletions(-) create mode 100644 tarantool/connection_pool.py create mode 100644 test/suites/test_pool.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b541c274..7a0eae78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Reusable testing workflow for integration with tarantool artifacts (PR #192). +- Connection pool with master discovery (PR #211, #196). + + ConnectionPool is supported only for Python 3.7 or newer. + Authenticated user must be able to call `box.info` on instances. + + ConnectionPool updates information about each server state (RO/RW) + on initial connect and then asynchronously in separate threads. + Application retries must be written considering the asynchronous nature + of cluster state refresh. User does not need to use any synchronization + mechanisms in requests, it's all handled with ConnectionPool methods. + + ConnectionPool API is the same as a plain Connection API. + On each request, a connection is chosen to execute this request. + A connection is chosen based on a request mode: + * Mode.ANY chooses any instance. + * Mode.RW chooses an RW instance. + * Mode.RO chooses an RO instance. + * Mode.PREFER_RW chooses an RW instance, if possible, RO instance + otherwise. + * Mode.PREFER_RO chooses an RO instance, if possible, RW instance + otherwise. + All requests that are guaranteed to write (insert, replace, delete, + upsert, update) use RW mode by default. select uses ANY by default. You + can set the mode explicitly. call, eval, execute and ping requests + require to set the mode explicitly. + + Example: + ```python + pool = tarantool.ConnectionPool( + addrs=[ + {'host': '108.177.16.0', 'port': 3301}, + {'host': '108.177.16.0', 'port': 3302}, + ], + user='test', + password='test',) + + pool.call('some_write_procedure', arg, mode=tarantool.Mode.RW) + ``` ### Changed - **Breaking**: drop Python 2 support (PR #207). diff --git a/tarantool/__init__.py b/tarantool/__init__.py index e8c6c916..8f7b6d07 100644 --- a/tarantool/__init__.py +++ b/tarantool/__init__.py @@ -78,3 +78,8 @@ def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None, __all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema', 'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning', 'SchemaError', 'dbapi'] + +# ConnectionPool is supported only for Python 3.7 or newer. +if sys.version_info.major >= 3 and sys.version_info.minor >= 7: + from tarantool.connection_pool import ConnectionPool, Mode + __all__.extend(['ConnectionPool', 'Mode']) diff --git a/tarantool/connection_pool.py b/tarantool/connection_pool.py new file mode 100644 index 00000000..45e5fead --- /dev/null +++ b/tarantool/connection_pool.py @@ -0,0 +1,468 @@ +# -*- coding: utf-8 -*- + +import abc +import itertools +import queue +import threading +import time +import typing +from dataclasses import dataclass, field +from enum import Enum + +from tarantool.connection import Connection, ConnectionInterface +from tarantool.const import ( + CONNECTION_TIMEOUT, + POOL_INSTANCE_RECONNECT_DELAY, + POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS, + POOL_REFRESH_DELAY, + SOCKET_TIMEOUT +) +from tarantool.error import ( + ClusterConnectWarning, + PoolTolopogyError, + PoolTolopogyWarning, + ConfigurationError, + DatabaseError, + NetworkError, + NetworkWarning, + tnt_strerror, + warn +) +from tarantool.utils import ENCODING_DEFAULT +from tarantool.mesh_connection import validate_address + + +class Mode(Enum): + ANY = 1 + RW = 2 + RO = 3 + PREFER_RW = 4 + PREFER_RO = 5 + + +class Status(Enum): + HEALTHY = 1 + UNHEALTHY = 2 + + +@dataclass +class InstanceState(): + status: Status = Status.UNHEALTHY + ro: typing.Optional[bool] = None + + +def QueueFactory(): + return queue.Queue(maxsize=1) + + +@dataclass +class PoolUnit(): + addr: dict + conn: Connection + input_queue: queue.Queue = field(default_factory=QueueFactory) + output_queue: queue.Queue = field(default_factory=QueueFactory) + thread: typing.Optional[threading.Thread] = None + state: InstanceState = field(default_factory=InstanceState) + # request_processing_enabled is used to stop requests processing + # in background thread on close or destruction. + request_processing_enabled: bool = False + + +# Based on https://realpython.com/python-interface/ +class StrategyInterface(metaclass=abc.ABCMeta): + @classmethod + def __subclasshook__(cls, subclass): + return (hasattr(subclass, 'update') and + callable(subclass.update) and + hasattr(subclass, 'getnext') and + callable(subclass.getnext) or + NotImplemented) + + @abc.abstractmethod + def update(self): + raise NotImplementedError + + @abc.abstractmethod + def getnext(self, mode): + raise NotImplementedError + +class RoundRobinStrategy(object): + """ + Simple round-robin connection rotation + """ + def __init__(self, pool): + self.ANY_iter = None + self.RW_iter = None + self.RO_iter = None + self.pool = pool + self.rebuild_needed = True + + def build(self): + ANY_pool = [] + RW_pool = [] + RO_pool = [] + + for key in self.pool: + state = self.pool[key].state + + if state.status == Status.UNHEALTHY: + continue + + ANY_pool.append(key) + + if state.ro == False: + RW_pool.append(key) + else: + RO_pool.append(key) + + if len(ANY_pool) > 0: + self.ANY_iter = itertools.cycle(ANY_pool) + else: + self.ANY_iter = None + + if len(RW_pool) > 0: + self.RW_iter = itertools.cycle(RW_pool) + else: + self.RW_iter = None + + if len(RO_pool) > 0: + self.RO_iter = itertools.cycle(RO_pool) + else: + self.RO_iter = None + + self.rebuild_needed = False + + def update(self): + self.rebuild_needed = True + + def getnext(self, mode): + if self.rebuild_needed: + self.build() + + if mode == Mode.ANY: + if self.ANY_iter is not None: + return next(self.ANY_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + elif mode == Mode.RW: + if self.RW_iter is not None: + return next(self.RW_iter) + else: + raise PoolTolopogyError("Can't find healthy rw instance in pool") + elif mode == Mode.RO: + if self.RO_iter is not None: + return next(self.RO_iter) + else: + raise PoolTolopogyError("Can't find healthy ro instance in pool") + elif mode == Mode.PREFER_RO: + if self.RO_iter is not None: + return next(self.RO_iter) + elif self.RW_iter is not None: + return next(self.RW_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + elif mode == Mode.PREFER_RW: + if self.RW_iter is not None: + return next(self.RW_iter) + elif self.RO_iter is not None: + return next(self.RO_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + + +@dataclass +class PoolTask(): + method_name: str + args: tuple + kwargs: dict + + +class ConnectionPool(ConnectionInterface): + ''' + Represents pool of connections to the cluster of Tarantool servers. + + ConnectionPool API is the same as a plain Connection API. + On each request, a connection is chosen to execute this request. + Connection is selected based on request mode: + * Mode.ANY chooses any instance. + * Mode.RW chooses an RW instance. + * Mode.RO chooses an RO instance. + * Mode.PREFER_RW chooses an RW instance, if possible, RO instance + otherwise. + * Mode.PREFER_RO chooses an RO instance, if possible, RW instance + otherwise. + All requests that are guaranteed to write (insert, replace, delete, + upsert, update) use RW mode by default. select uses ANY by default. You + can set the mode explicitly. call, eval, execute and ping requests + require to set the mode explicitly. + ''' + def __init__(self, + addrs, + user=None, + password=None, + socket_timeout=SOCKET_TIMEOUT, + reconnect_max_attempts=POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS, + reconnect_delay=POOL_INSTANCE_RECONNECT_DELAY, + connect_now=True, + encoding=ENCODING_DEFAULT, + call_16=False, + connection_timeout=CONNECTION_TIMEOUT, + strategy_class=RoundRobinStrategy, + refresh_delay=POOL_REFRESH_DELAY): + ''' + Initialize connections to the cluster of servers. + + :param list addrs: List of {host: , port:} dictionaries, + describing server addresses. + :param int reconnect_max_attempts: Max attempts to reconnect + for each connection in the pool. Be careful with reconnect + parameters in ConnectionPool since every status refresh is + also a request with reconnection. Default is 0 (fail after + first attempt). + :param float reconnect_delay: Time between reconnect + attempts for each connection in the pool. Be careful with + reconnect parameters in ConnectionPool since every status + refresh is also a request with reconnection. Default is 0. + :param StrategyInterface strategy_class: Class for choosing + instance based on request mode. By default, round-robin + strategy is used. + :param int refresh_delay: Minimal time between RW/RO status + refreshes. + ''' + + if not isinstance(addrs, list) or len(addrs) == 0: + raise ConfigurationError("addrs must be non-empty list") + + # Verify addresses. + for addr in addrs: + ok, msg = validate_address(addr) + if not ok: + raise ConfigurationError(msg) + self.addrs = addrs + + # Create connections + self.pool = {} + self.refresh_delay = refresh_delay + self.strategy = strategy_class(self.pool) + + for addr in self.addrs: + key = self._make_key(addr) + self.pool[key] = PoolUnit( + addr=addr, + conn=Connection( + host=addr['host'], + port=addr['port'], + user=user, + password=password, + socket_timeout=socket_timeout, + reconnect_max_attempts=reconnect_max_attempts, + reconnect_delay=reconnect_delay, + connect_now=False, # Connect in ConnectionPool.connect() + encoding=encoding, + call_16=call_16, + connection_timeout=connection_timeout) + ) + + if connect_now: + self.connect() + + def __del__(self): + self.close() + + def _make_key(self, addr): + return '{0}:{1}'.format(addr['host'], addr['port']) + + def _get_new_state(self, unit): + conn = unit.conn + + if conn.is_closed(): + try: + conn.connect() + except NetworkError as e: + msg = "Failed to connect to {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, ClusterConnectWarning) + return InstanceState(Status.UNHEALTHY) + + try: + resp = conn.call('box.info') + except NetworkError as e: + msg = "Failed to get box.info for {0}:{1}, reason: {2}".format( + unit.addr['host'], unit.addr['port'], repr(e)) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + try: + ro = resp.data[0]['ro'] + except (IndexError, KeyError) as e: + msg = "Incorrect box.info response from {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + try: + status = resp.data[0]['status'] + + if status != 'running': + msg = "{0}:{1} instance status is not 'running'".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + except (IndexError, KeyError) as e: + msg = "Incorrect box.info response from {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + return InstanceState(Status.HEALTHY, ro) + + def _refresh_state(self, key): + unit = self.pool[key] + + state = self._get_new_state(unit) + if state != unit.state: + unit.state = state + self.strategy.update() + + def close(self): + for unit in self.pool.values(): + unit.request_processing_enabled = False + unit.thread.join() + + if not unit.conn.is_closed(): + unit.conn.close() + + def is_closed(self): + return all(unit.request_processing_enabled == False for unit in self.pool.values()) + + def _request_process_loop(self, key, unit, last_refresh): + while unit.request_processing_enabled: + if not unit.input_queue.empty(): + task = unit.input_queue.get() + method = getattr(Connection, task.method_name) + try: + resp = method(unit.conn, *task.args, **task.kwargs) + except Exception as e: + unit.output_queue.put(e) + else: + unit.output_queue.put(resp) + + now = time.time() + + if now - last_refresh > self.refresh_delay: + self._refresh_state(key) + last_refresh = time.time() + + def connect(self): + for key in self.pool: + unit = self.pool[key] + + self._refresh_state(key) + last_refresh = time.time() + + unit.thread = threading.Thread( + target=self._request_process_loop, + args=(key, unit, last_refresh), + daemon=True, + ) + unit.request_processing_enabled = True + unit.thread.start() + + def _send(self, mode, method_name, *args, **kwargs): + key = self.strategy.getnext(mode) + unit = self.pool[key] + + task = PoolTask(method_name=method_name, args=args, kwargs=kwargs) + + unit.input_queue.put(task) + resp = unit.output_queue.get() + + if isinstance(resp, Exception): + raise resp + + return resp + + def call(self, func_name, *args, mode=None): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'call', func_name, *args) + + def eval(self, expr, *args, mode=None): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'eval', expr, *args) + + def replace(self, space_name, values, *, mode=Mode.RW): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'replace', space_name, values) + + def insert(self, space_name, values, *, mode=Mode.RW): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'insert', space_name, values) + + def delete(self, space_name, key, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'delete', space_name, key, **kwargs) + + def upsert(self, space_name, tuple_value, op_list, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'upsert', space_name, tuple_value, + op_list, **kwargs) + + def update(self, space_name, key, op_list, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'update', space_name, key, + op_list, **kwargs) + + def ping(self, *, mode=None, **kwargs): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'ping', **kwargs) + + def select(self, space_name, key, *, mode=Mode.ANY, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is + ANY). + ''' + + return self._send(mode, 'select', space_name, key, **kwargs) + + def execute(self, query, params=None, *, mode=None): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'execute', query, params) diff --git a/tarantool/const.py b/tarantool/const.py index 0db35978..9eb834a8 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -96,3 +96,9 @@ RECONNECT_DELAY = 0.1 # Default cluster nodes list refresh interval (seconds) CLUSTER_DISCOVERY_DELAY = 60 +# Default cluster nodes state refresh interval (seconds) +POOL_REFRESH_DELAY = 1 +# Default maximum number of attempts to reconnect for pool instance +POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS = 0 +# Default delay between attempts to reconnect (seconds) +POOL_INSTANCE_RECONNECT_DELAY = 0 diff --git a/tarantool/error.py b/tarantool/error.py index 78519b68..c7165a9b 100644 --- a/tarantool/error.py +++ b/tarantool/error.py @@ -223,6 +223,21 @@ class ClusterDiscoveryWarning(UserWarning): pass +class ClusterConnectWarning(UserWarning): + '''Warning related to cluster pool connection''' + pass + + +class PoolTolopogyWarning(UserWarning): + '''Warning related to ro/rw cluster pool topology''' + pass + + +class PoolTolopogyError(DatabaseError): + '''Exception raised due to unsatisfying ro/rw cluster pool topology''' + pass + + # always print this warnings warnings.filterwarnings("always", category=NetworkWarning) diff --git a/test/suites/__init__.py b/test/suites/__init__.py index 8e2eafc1..1868ad42 100644 --- a/test/suites/__init__.py +++ b/test/suites/__init__.py @@ -10,6 +10,7 @@ from .test_protocol import TestSuite_Protocol from .test_reconnect import TestSuite_Reconnect from .test_mesh import TestSuite_Mesh +from .test_pool import TestSuite_Pool from .test_execute import TestSuite_Execute from .test_dbapi import TestSuite_DBAPI from .test_encoding import TestSuite_Encoding @@ -18,7 +19,7 @@ TestSuite_Schema_BinaryConnection, TestSuite_Request, TestSuite_Protocol, TestSuite_Reconnect, TestSuite_Mesh, TestSuite_Execute, TestSuite_DBAPI, - TestSuite_Encoding) + TestSuite_Encoding, TestSuite_Pool) def load_tests(loader, tests, pattern): suite = unittest.TestSuite() diff --git a/test/suites/lib/skip.py b/test/suites/lib/skip.py index 5fcc9355..284b70b6 100644 --- a/test/suites/lib/skip.py +++ b/test/suites/lib/skip.py @@ -18,8 +18,18 @@ def wrapper(self, *args, **kwargs): func(self, *args, **kwargs) if not hasattr(self, 'tnt_version'): + srv = None + + if hasattr(self, 'servers'): + srv = self.servers[0] + + if hasattr(self, 'srv'): + srv = self.srv + + assert srv is not None + self.__class__.tnt_version = re.match( - r'[\d.]+', self.srv.admin('box.info.version')[0] + r'[\d.]+', srv.admin('box.info.version')[0] ).group() tnt_version = pkg_resources.parse_version(self.tnt_version) @@ -34,9 +44,8 @@ def wrapper(self, *args, **kwargs): return wrapper -def skip_or_run_test_python_major(func, REQUIRED_PYTHON_MAJOR, msg): - """Decorator to skip or run tests depending on the Python major - version. +def skip_or_run_test_python(func, REQUIRED_PYTHON_VERSION, msg): + """Decorator to skip or run tests depending on the Python version. Also, it can be used with the 'setUp' method for skipping the whole test suite. @@ -47,9 +56,12 @@ def wrapper(self, *args, **kwargs): if func.__name__ == 'setUp': func(self, *args, **kwargs) - major = sys.version_info.major - if major != REQUIRED_PYTHON_MAJOR: - self.skipTest('Python %s connector %s' % (major, msg)) + ver = sys.version_info + python_version_str = '%d.%d' % (ver.major, ver.minor) + python_version = pkg_resources.parse_version(python_version_str) + support_version = pkg_resources.parse_version(REQUIRED_PYTHON_VERSION) + if python_version < support_version: + self.skipTest('Python %s connector %s' % (python_version, msg)) if func.__name__ != 'setUp': func(self, *args, **kwargs) @@ -80,3 +92,12 @@ def skip_or_run_varbinary_test(func): return skip_or_run_test_tarantool(func, '2.2.1', 'does not support VARBINARY type') + +def skip_or_run_conn_pool_test(func): + """Decorator to skip or run ConnectionPool tests depending on + the Python version. + """ + + return skip_or_run_test_python(func, '3.7', + 'does not support ConnectionPool') + diff --git a/test/suites/test_pool.py b/test/suites/test_pool.py new file mode 100644 index 00000000..12263d79 --- /dev/null +++ b/test/suites/test_pool.py @@ -0,0 +1,539 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function + +import sys +import time +import unittest +import warnings + +import tarantool +from tarantool.error import PoolTolopogyError, DatabaseError, NetworkError + +from .lib.skip import skip_or_run_sql_test, skip_or_run_conn_pool_test +from .lib.tarantool_server import TarantoolServer + + +def create_server(_id): + srv = TarantoolServer() + srv.script = 'test/suites/box.lua' + srv.start() + srv.admin("box.schema.user.create('test', {password = 'test', " + + "if_not_exists = true})") + srv.admin("box.schema.user.grant('test', 'execute', 'universe')") + srv.admin("box.schema.space.create('test')") + srv.admin(r"box.space.test:format({" + +r" { name = 'pk', type = 'string' }," + + r" { name = 'id', type = 'number', is_nullable = true }" + + r"})") + srv.admin(r"box.space.test:create_index('pk'," + + r"{ unique = true," + + r" parts = {{field = 1, type = 'string'}}})") + srv.admin(r"box.space.test:create_index('id'," + + r"{ unique = true," + + r" parts = {{field = 2, type = 'number', is_nullable=true}}})") + srv.admin("box.schema.user.grant('test', 'read,write', 'space', 'test')") + srv.admin("json = require('json')") + + # Create srv_id function (for testing purposes). + srv.admin("function srv_id() return %s end" % _id) + return srv + + +@unittest.skipIf(sys.platform.startswith("win"), + 'Pool tests on windows platform are not supported') +class TestSuite_Pool(unittest.TestCase): + def set_ro(self, srv, read_only): + if read_only: + req = r'box.cfg{read_only = true}' + else: + req = r'box.cfg{read_only = false}' + + srv.admin(req) + + def set_cluster_ro(self, read_only_list): + assert len(self.servers) == len(read_only_list) + + for i in range(len(self.servers)): + self.set_ro(self.servers[i], read_only_list[i]) + + def retry(self, func, count=5, timeout=0.5): + for i in range(count): + try: + func() + except Exception as e: + if i + 1 == count: + raise e + + time.sleep(timeout) + + @classmethod + def setUpClass(self): + print(' POOL '.center(70, '='), file=sys.stderr) + print('-' * 70, file=sys.stderr) + + @skip_or_run_conn_pool_test + def setUp(self): + # Create five servers and extract helpful fields for tests. + self.servers = [] + self.addrs = [] + self.servers_count = 5 + for i in range(self.servers_count): + srv = create_server(i) + self.servers.append(srv) + self.addrs.append({'host': srv.host, 'port': srv.args['primary']}) + + def test_00_basic(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.RW), + [False]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.RO), + [True]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RW), + [False]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RO), + [True]) + + def test_01_roundrobin(self): + self.set_cluster_ro([False, False, True, False, True]) + RW_ports = set([str(self.addrs[0]['port']), str(self.addrs[1]['port']), str(self.addrs[3]['port'])]) + RO_ports = set([str(self.addrs[2]['port']), str(self.addrs[4]['port'])]) + all_ports = set() + for addr in self.addrs: + all_ports.add(str(addr['port'])) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + def get_port(self, mode): + resp = self.pool.eval('return box.cfg.listen', mode=mode) + self.assertIsInstance(resp.data[0], str) + return resp.data[0] + + # Expect ANY iterate through all instances. + ANY_ports_result = set() + for i in range(len(self.servers)): + ANY_ports_result.add(get_port(self, tarantool.Mode.ANY)) + + self.assertSetEqual(ANY_ports_result, all_ports) + + # Expect RW iterate through all RW instances. + RW_ports_result = set() + for i in range(len(self.servers)): + RW_ports_result.add(get_port(self, tarantool.Mode.RW)) + + self.assertSetEqual(RW_ports_result, RW_ports) + + # Expect RO iterate through all RO instances. + RO_ports_result = set() + for i in range(len(self.servers)): + RO_ports_result.add(get_port(self, tarantool.Mode.RO)) + + self.assertSetEqual(RO_ports_result, RO_ports) + + # Expect PREFER_RW iterate through all RW instances if there is at least one. + PREFER_RW_ports_result = set() + for i in range(len(self.servers)): + PREFER_RW_ports_result.add(get_port(self, tarantool.Mode.PREFER_RW)) + + self.assertSetEqual(PREFER_RW_ports_result, RW_ports) + + # Expect PREFER_RO iterate through all RO instances if there is at least one. + PREFER_RO_ports_result = set() + for i in range(len(self.servers)): + PREFER_RO_ports_result.add(get_port(self, tarantool.Mode.PREFER_RO)) + + self.assertSetEqual(PREFER_RO_ports_result, RO_ports) + + # Setup cluster with no RW. + self.set_cluster_ro([True, True, True, True, True]) + + # Expect RW to fail if there are no RW. + def expect_RW_to_fail_if_there_are_no_RW(): + with self.assertRaises(PoolTolopogyError): + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW) + + self.retry(func=expect_RW_to_fail_if_there_are_no_RW) + + # Expect PREFER_RW iterate through all instances if there are no RW. + def expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW(): + PREFER_RW_ports_result_all_ro = set() + for i in range(len(self.servers)): + PREFER_RW_ports_result_all_ro.add(get_port(self, tarantool.Mode.PREFER_RW)) + + self.assertSetEqual(PREFER_RW_ports_result_all_ro, all_ports) + + self.retry(func=expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW) + + # Setup cluster with no RO. + self.set_cluster_ro([False, False, False, False, False]) + + # Expect RO to fail if there are no RO. + def expect_RO_to_fail_if_there_are_no_RO(): + with self.assertRaises(PoolTolopogyError): + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RO) + + self.retry(func=expect_RO_to_fail_if_there_are_no_RO) + + # Expect PREFER_RO iterate through all instances if there are no RO. + def expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO(): + PREFER_RO_ports_result_all_rw = set() + for i in range(len(self.servers)): + PREFER_RO_ports_result_all_rw.add(get_port(self, tarantool.Mode.PREFER_RO)) + + self.assertSetEqual(PREFER_RO_ports_result_all_rw, all_ports) + + self.retry(func=expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO) + + def test_02_exception_raise(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaises(DatabaseError): + self.pool.call('non_existing_procedure', mode=tarantool.Mode.ANY) + + def test_03_insert(self): + self.set_cluster_ro([True, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.insert('test', ['test_03_insert_1', 1]), + [['test_03_insert_1', 1]]) + self.assertSequenceEqual( + self.pool.insert('test', ['test_03_insert_2', 2], + mode=tarantool.Mode.RW), + [['test_03_insert_2', 2]]) + + conn_2 = tarantool.connect( + host=self.addrs[2]['host'], + port=self.addrs[2]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + conn_2.select('test', 'test_03_insert_1'), + [['test_03_insert_1', 1]]) + + def test_04_delete(self): + self.set_cluster_ro([True, True, True, False, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_3 = tarantool.connect( + host=self.addrs[3]['host'], + port=self.addrs[3]['port'], + user='test', + password='test') + + conn_3.insert('test', ['test_04_delete_1', 1]) + conn_3.insert('test', ['test_04_delete_2', 2]) + + self.assertSequenceEqual( + self.pool.delete('test', 'test_04_delete_1'), + [['test_04_delete_1', 1]]) + self.assertSequenceEqual( + conn_3.select('test', 'test_04_delete_1'), + []) + + self.assertSequenceEqual( + self.pool.delete('test', 2, index='id', mode=tarantool.Mode.RW), + [['test_04_delete_2', 2]]) + self.assertSequenceEqual( + conn_3.select('test', 'test_04_delete_2'), + []) + + def test_05_upsert(self): + self.set_cluster_ro([True, False, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_1 = tarantool.connect( + host=self.addrs[1]['host'], + port=self.addrs[1]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.upsert('test', ['test_05_upsert', 3], [('+', 1, 1)]), + []) + self.assertSequenceEqual( + conn_1.select('test', 'test_05_upsert'), + [['test_05_upsert', 3]]) + + self.assertSequenceEqual( + self.pool.upsert('test', ['test_05_upsert', 3], + [('+', 1, 1)], mode=tarantool.Mode.RW), []) + self.assertSequenceEqual( + conn_1.select('test', 'test_05_upsert'), + [['test_05_upsert', 4]]) + + def test_06_update(self): + self.set_cluster_ro([True, True, True, True, False]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_4 = tarantool.connect( + host=self.addrs[4]['host'], + port=self.addrs[4]['port'], + user='test', + password='test') + conn_4.insert('test', ['test_06_update_1', 3]) + conn_4.insert('test', ['test_06_update_2', 14]) + + self.assertSequenceEqual( + self.pool.update('test', ('test_06_update_1',), [('+', 1, 1)]), + [['test_06_update_1', 4]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_06_update_1'), + [['test_06_update_1', 4]]) + + self.assertSequenceEqual( + self.pool.update('test', ('test_06_update_2',), + [('=', 1, 10)], mode=tarantool.Mode.RW), + [['test_06_update_2', 10]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_06_update_2'), + [['test_06_update_2', 10]]) + + def test_07_replace(self): + self.set_cluster_ro([True, True, True, True, False]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_4 = tarantool.connect( + host=self.addrs[4]['host'], + port=self.addrs[4]['port'], + user='test', + password='test') + conn_4.insert('test', ['test_07_replace', 3]) + + self.assertSequenceEqual( + self.pool.replace('test', ['test_07_replace', 4], + mode=tarantool.Mode.RW), + [['test_07_replace', 4]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_07_replace'), + [['test_07_replace', 4]]) + + self.assertSequenceEqual( + self.pool.replace('test', ['test_07_replace', 5]), + [['test_07_replace', 5]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_07_replace'), + [['test_07_replace', 5]]) + + def test_08_select(self): + self.set_cluster_ro([False, False, False, False, False]) + + for addr in self.addrs: + conn = tarantool.connect( + host=addr['host'], + port=addr['port'], + user='test', + password='test') + conn.insert('test', ['test_08_select', 3]) + + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.select('test', 'test_08_select'), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', ['test_08_select'], + mode=tarantool.Mode.ANY), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', 3, index='id', + mode=tarantool.Mode.RO), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', [3], index='id', + mode=tarantool.Mode.PREFER_RW), + [['test_08_select', 3]]) + + def test_09_ping(self): + self.pool = tarantool.ConnectionPool(addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.ping() + + self.assertTrue(self.pool.ping(mode=tarantool.Mode.ANY) > 0) + self.assertEqual(self.pool.ping(mode=tarantool.Mode.RW, notime=True), "Success") + + def test_10_call(self): + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.call('box.info') + + self.assertEqual( + self.pool.call('box.info', mode=tarantool.Mode.RW)[0]['ro'], + False) + + self.assertSequenceEqual( + self.pool.call('json.encode', {'test_10_call': 1}, mode=tarantool.Mode.ANY), + ['{"test_10_call":1}']) + + def test_11_eval(self): + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.eval('return box.info()') + + self.assertEqual( + self.pool.eval('return box.info()', mode=tarantool.Mode.RW)[0]['ro'], + False) + + self.assertSequenceEqual( + self.pool.eval('return json.encode(...)', {'test_11_eval': 1}, mode=tarantool.Mode.ANY), + ['{"test_11_eval":1}']) + + @skip_or_run_sql_test + def test_12_execute(self): + self.set_cluster_ro([False, True, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.execute("insert into \"test\" values ('test_12_execute_1', 1)") + + resp = self.pool.execute( + "insert into \"test\" values ('test_12_execute_1', 1)", + mode=tarantool.Mode.RW) + + self.assertEqual(resp.affected_row_count, 1) + self.assertEqual(resp.data, None) + + resp = self.pool.execute( + 'insert into "test" values (:pk, :id)', + { 'pk': 'test_12_execute_2', 'id': 2}, + mode=tarantool.Mode.RW) + self.assertEqual(resp.affected_row_count, 1) + self.assertEqual(resp.data, None) + + conn_0 = tarantool.connect( + host=self.addrs[0]['host'], + port=self.addrs[0]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + conn_0.select('test', 'test_12_execute_1'), + [['test_12_execute_1', 1]]) + self.assertSequenceEqual( + conn_0.select('test', 'test_12_execute_2'), + [['test_12_execute_2', 2]]) + + def test_13_failover(self): + self.set_cluster_ro([False, True, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + # Simulate failover + self.servers[0].stop() + self.set_ro(self.servers[1], False) + + def expect_RW_request_execute_on_new_master(): + self.assertSequenceEqual( + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW), + [ str(self.addrs[1]['port']) ]) + + self.retry(func=expect_RW_request_execute_on_new_master) + + def test_14_cluster_with_instances_dead_in_runtime_is_ok(self): + self.set_cluster_ro([False, True, False, True, True]) + self.servers[0].stop() + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + self.pool.ping(mode=tarantool.Mode.RW) + + def test_15_cluster_with_dead_instances_on_start_is_ok(self): + self.set_cluster_ro([False, True, True, True, True]) + self.servers[0].stop() + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + self.servers[0].start() + + def ping_RW(): + self.pool.ping(mode=tarantool.Mode.RW) + + self.retry(func=ping_RW) + + def test_16_is_closed(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test',) + + self.assertEquals(self.pool.is_closed(), False) + + self.pool.close() + + self.assertEquals(self.pool.is_closed(), True) + + def tearDown(self): + if hasattr(self, 'pool'): + self.pool.close() + + for srv in self.servers: + srv.stop() + srv.clean()