From 7a4659b6b9707640437ba6e54e8ae4cef0ff367f Mon Sep 17 00:00:00 2001 From: Mike Siomkin Date: Thu, 21 Mar 2019 14:34:19 +0300 Subject: [PATCH] Implement getting the cluster nodes list from nodes Periodically call a user-defined Lua function on the current node to obtain or refresh the full cluster nodes list. Resolves: #134 --- tarantool/connection.py | 49 ++++++++++++++++++++++++++--- tarantool/const.py | 2 ++ tarantool/mesh_connection.py | 34 ++++++++++++++++++-- unit/suites/test_reconnect.py | 58 +++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 6 deletions(-) diff --git a/tarantool/connection.py b/tarantool/connection.py index 6f330896..550d1af2 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -126,6 +126,7 @@ def __init__(self, host, port, self.encoding = encoding self.call_16 = call_16 self.connection_timeout = connection_timeout + self.authenticated = False if connect_now: self.connect() @@ -200,9 +201,11 @@ def connect(self): :raise: `NetworkError` ''' try: + self.authenticated = False self.connect_basic() self.handshake() self.load_schema() + self.authenticated = True except Exception as e: self.connected = False raise NetworkError(e) @@ -370,14 +373,33 @@ def call(self, func_name, *args): :rtype: `Response` instance ''' - assert isinstance(func_name, str) # This allows to use a tuple or list as an argument if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] + return self.call_ex(func_name, args, True) + + def call_ex(self, func_name, args=[], reconnect=True): + ''' + Execute CALL request. Call stored Lua function. + + :param func_name: stored Lua function name + :type func_name: str + :param args: list of function arguments + :type args: list or tuple + :param reconnect: reconnect before call + :type reconnect: boolean + + :rtype: `Response` instance + ''' + assert isinstance(func_name, str) + request = RequestCall(self, func_name, args, self.call_16) - response = self._send_request(request) + if reconnect: + response = self._send_request(request) + else: + response = self._send_request_wo_reconnect(request) return response def eval(self, expr, *args): @@ -391,14 +413,33 @@ def eval(self, expr, *args): :rtype: `Response` instance ''' - assert isinstance(expr, str) # This allows to use a tuple or list as an argument if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] + return self.eval_ex(expr, args, True) + + def eval_ex(self, expr, args=[], reconnect=True): + ''' + Execute EVAL request. Eval Lua expression. + + :param expr: Lua expression + :type expr: str + :param args: list of function arguments + :type args: list or tuple + :param reconnect: reconnect before call + :type reconnect: boolean + + :rtype: `Response` instance + ''' + assert isinstance(expr, str) + request = RequestEval(self, expr, args) - response = self._send_request(request) + if reconnect: + response = self._send_request(request) + else: + response = self._send_request_wo_reconnect(request) return response def replace(self, space_name, values): diff --git a/tarantool/const.py b/tarantool/const.py index 1ebad494..61904911 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -86,3 +86,5 @@ RECONNECT_MAX_ATTEMPTS = 10 # Default delay between attempts to reconnect (seconds) RECONNECT_DELAY = 0.1 +# Default cluster nodes list refresh interval (seconds) +NODES_REFRESH_INTERVAL = 300 diff --git a/tarantool/mesh_connection.py b/tarantool/mesh_connection.py index a2d69c56..765512d9 100644 --- a/tarantool/mesh_connection.py +++ b/tarantool/mesh_connection.py @@ -4,13 +4,15 @@ between tarantool instances and basic Round-Robin strategy. ''' +import time from tarantool.connection import Connection from tarantool.error import NetworkError from tarantool.utils import ENCODING_DEFAULT from tarantool.const import ( SOCKET_TIMEOUT, RECONNECT_MAX_ATTEMPTS, - RECONNECT_DELAY + RECONNECT_DELAY, + NODES_REFRESH_INTERVAL ) @@ -34,12 +36,15 @@ def __init__(self, addrs, reconnect_delay=RECONNECT_DELAY, connect_now=True, encoding=ENCODING_DEFAULT, - strategy_class=RoundRobinStrategy): + strategy_class=RoundRobinStrategy, + nodes_refresh_interval=NODES_REFRESH_INTERVAL): self.nattempts = 2 * len(addrs) + 1 self.strategy = strategy_class(addrs) addr = self.strategy.getnext() host = addr['host'] port = addr['port'] + self.nodes_refresh_interval = nodes_refresh_interval + self.last_nodes_refresh = 0 super(MeshConnection, self).__init__(host=host, port=port, user=user, @@ -63,3 +68,28 @@ def _opt_reconnect(self): self.port = addr['port'] else: raise NetworkError + + if self.authenticated: + now = time.time() + if now - self.last_nodes_refresh > self.nodes_refresh_interval: + self.refresh_nodes(now) + + def refresh_nodes(self, cur_time): + resp = super(MeshConnection, self).eval_ex('return get_nodes ~= nil', + [], reconnect=False) + if not (resp.data and resp.data[0]): + return + + resp = super(MeshConnection, self).call_ex('get_nodes', [], + reconnect=False) + + if not (resp.data and resp.data[0]): + return + + addrs = resp.data[0] + if type(addrs) is list: + self.strategy.addrs = addrs + self.strategy.pos = 0 + self.last_nodes_refresh = cur_time + if not {'host': self.host, 'port': self.port} in addrs: + self._opt_reconnect() diff --git a/unit/suites/test_reconnect.py b/unit/suites/test_reconnect.py index 7a6a746e..482f0c80 100644 --- a/unit/suites/test_reconnect.py +++ b/unit/suites/test_reconnect.py @@ -66,6 +66,64 @@ def test_02_wrong_auth(self): con.close() self.srv.stop() + def test_03_mesh(self): + # Start two servers + self.srv.start() + self.srv.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })") + self.srv.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')") + + self.srv2 = TarantoolServer() + self.srv2.script = 'unit/suites/box.lua' + self.srv2.start() + self.srv2.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })") + self.srv2.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')") + + get_nodes = " \ + function get_nodes() \ + return { \ + { \ + host = '%s', \ + port = tonumber(%d) \ + }, \ + { \ + host = '%s', \ + port = tonumber(%d) \ + } \ + } \ + end" % (self.srv.host, self.srv.args['primary'], self.srv2.host, self.srv2.args['primary']) + + # Create get_nodes function + self.srv.admin(get_nodes) + self.srv2.admin(get_nodes) + + # Create srv_id function (for testing purposes) + self.srv.admin("function srv_id() return 1 end") + self.srv2.admin("function srv_id() return 2 end") + + # Create a mesh connection, pass only the first server address + con = tarantool.MeshConnection([{ + 'host': self.srv.host, 'port': self.srv.args['primary']}], + user='test', + password='test', + connect_now=True) + + # Check we work with the first server + resp = con.call('srv_id') + self.assertIs(resp.data and resp.data[0] == 1, True) + + # Stop the first server. + self.srv.stop() + + # Check we work with the second server + resp = con.call('srv_id') + self.assertIs(resp.data and resp.data[0] == 2, True) + + # Stop the second server. + self.srv2.stop() + + # Close the connection + con.close() + @classmethod def tearDownClass(self): self.srv.clean()