Skip to content

Commit

Permalink
Implement getting the cluster nodes list from nodes
Browse files Browse the repository at this point in the history
Periodically call a user-defined Lua function on the current node to
obtain or refresh the full cluster nodes list.

Resolves: #134
  • Loading branch information
msiomkin committed Mar 23, 2019
1 parent a6b6d56 commit 746a084
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 6 deletions.
49 changes: 45 additions & 4 deletions tarantool/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(self, host, port,
self.encoding = encoding
self.call_16 = call_16
self.connection_timeout = connection_timeout
self.authenticated = False
if connect_now:
self.connect()

Expand Down Expand Up @@ -200,9 +201,11 @@ def connect(self):
:raise: `NetworkError`
'''
try:
self.authenticated = False
self.connect_basic()
self.handshake()
self.load_schema()
self.authenticated = True
except Exception as e:
self.connected = False
raise NetworkError(e)
Expand Down Expand Up @@ -370,14 +373,33 @@ def call(self, func_name, *args):
:rtype: `Response` instance
'''
assert isinstance(func_name, str)

# This allows to use a tuple or list as an argument
if len(args) == 1 and isinstance(args[0], (list, tuple)):
args = args[0]

return self.call_ex(func_name, args, True)

def call_ex(self, func_name, args=[], reconnect=True):
'''
Execute CALL request. Call stored Lua function.
:param func_name: stored Lua function name
:type func_name: str
:param args: list of function arguments
:type args: list or tuple
:param reconnect: reconnect before call
:type reconnect: boolean
:rtype: `Response` instance
'''
assert isinstance(func_name, str)

request = RequestCall(self, func_name, args, self.call_16)
response = self._send_request(request)
if reconnect:
response = self._send_request(request)
else:
response = self._send_request_wo_reconnect(request)
return response

def eval(self, expr, *args):
Expand All @@ -391,14 +413,33 @@ def eval(self, expr, *args):
:rtype: `Response` instance
'''
assert isinstance(expr, str)

# This allows to use a tuple or list as an argument
if len(args) == 1 and isinstance(args[0], (list, tuple)):
args = args[0]

return self.eval_ex(expr, args, True)

def eval_ex(self, expr, args=[], reconnect=True):
'''
Execute EVAL request. Eval Lua expression.
:param expr: Lua expression
:type expr: str
:param args: list of function arguments
:type args: list or tuple
:param reconnect: reconnect before call
:type reconnect: boolean
:rtype: `Response` instance
'''
assert isinstance(expr, str)

request = RequestEval(self, expr, args)
response = self._send_request(request)
if reconnect:
response = self._send_request(request)
else:
response = self._send_request_wo_reconnect(request)
return response

def replace(self, space_name, values):
Expand Down
2 changes: 2 additions & 0 deletions tarantool/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,5 @@
RECONNECT_MAX_ATTEMPTS = 10
# Default delay between attempts to reconnect (seconds)
RECONNECT_DELAY = 0.1
# Default cluster nodes list refresh interval (seconds)
NODES_REFRESH_INTERVAL = 300
38 changes: 36 additions & 2 deletions tarantool/mesh_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
between tarantool instances and basic Round-Robin strategy.
'''

import time
from tarantool.connection import Connection
from tarantool.error import NetworkError
from tarantool.utils import ENCODING_DEFAULT
from tarantool.const import (
SOCKET_TIMEOUT,
RECONNECT_MAX_ATTEMPTS,
RECONNECT_DELAY
RECONNECT_DELAY,
NODES_REFRESH_INTERVAL
)


Expand All @@ -34,12 +36,16 @@ def __init__(self, addrs,
reconnect_delay=RECONNECT_DELAY,
connect_now=True,
encoding=ENCODING_DEFAULT,
strategy_class=RoundRobinStrategy):
strategy_class=RoundRobinStrategy,
nodes_refresh_interval=NODES_REFRESH_INTERVAL):
self.nattempts = 2 * len(addrs) + 1
self.strategy = strategy_class(addrs)
self.strategy_class = strategy_class
addr = self.strategy.getnext()
host = addr['host']
port = addr['port']
self.nodes_refresh_interval = nodes_refresh_interval
self.last_nodes_refresh = 0
super(MeshConnection, self).__init__(host=host,
port=port,
user=user,
Expand All @@ -63,3 +69,31 @@ def _opt_reconnect(self):
self.port = addr['port']
else:
raise NetworkError

if self.authenticated:
now = time.time()
if now - self.last_nodes_refresh > self.nodes_refresh_interval:
self.refresh_nodes(now)

def refresh_nodes(self, cur_time):
resp = super(MeshConnection, self).eval_ex('return get_nodes ~= nil',
[], reconnect=False)
if not (resp.data and resp.data[0]):
return

resp = super(MeshConnection, self).call_ex('get_nodes', [],
reconnect=False)

if not (resp.data and resp.data[0]):
return

addrs = resp.data[0]
if type(addrs) is list:
self.strategy = self.strategy_class(addrs)
self.last_nodes_refresh = cur_time
if not {'host': self.host, 'port': self.port} in addrs:
addr = self.strategy.getnext()
self.host = addr['host']
self.port = addr['port']
self.close()
self._opt_reconnect()
108 changes: 108 additions & 0 deletions unit/suites/test_reconnect.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,114 @@ def test_02_wrong_auth(self):
con.close()
self.srv.stop()

def test_03_mesh(self):
# Start two servers
self.srv.start()
self.srv.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
self.srv.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")

self.srv2 = TarantoolServer()
self.srv2.script = 'unit/suites/box.lua'
self.srv2.start()
self.srv2.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
self.srv2.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")

# get_nodes function contains both servers' addresses
get_nodes = " \
function get_nodes() \
return { \
{ \
host = '%s', \
port = tonumber(%d) \
}, \
{ \
host = '%s', \
port = tonumber(%d) \
} \
} \
end" % (self.srv.host, self.srv.args['primary'], self.srv2.host, self.srv2.args['primary'])

# Create get_nodes function on servers
self.srv.admin(get_nodes)
self.srv2.admin(get_nodes)

# Create srv_id function (for testing purposes)
self.srv.admin("function srv_id() return 1 end")
self.srv2.admin("function srv_id() return 2 end")

# Create a mesh connection, pass only the first server address
con = tarantool.MeshConnection([{
'host': self.srv.host, 'port': self.srv.args['primary']}],
user='test',
password='test',
connect_now=True)

# Check we work with the first server
resp = con.call('srv_id')
self.assertIs(resp.data and resp.data[0] == 1, True)

# Stop the first server
self.srv.stop()

# Check we work with the second server
resp = con.call('srv_id')
self.assertIs(resp.data and resp.data[0] == 2, True)

# Stop the second server
self.srv2.stop()

# Close the connection
con.close()

def test_04_mesh_exclude_node(self):
# Start two servers
self.srv.start()
self.srv.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
self.srv.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")

self.srv2 = TarantoolServer()
self.srv2.script = 'unit/suites/box.lua'
self.srv2.start()
self.srv2.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
self.srv2.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")

# get_nodes function contains only the second server address
get_nodes = " \
function get_nodes() \
return { \
{ \
host = '%s', \
port = tonumber(%d) \
} \
} \
end" % (self.srv2.host, self.srv2.args['primary'])

# Create get_nodes function on servers
self.srv.admin(get_nodes)
self.srv2.admin(get_nodes)

# Create srv_id function (for testing purposes)
self.srv.admin("function srv_id() return 1 end")
self.srv2.admin("function srv_id() return 2 end")

# Create a mesh connection, pass only the first server address
con = tarantool.MeshConnection([{
'host': self.srv.host, 'port': self.srv.args['primary']}],
user='test',
password='test',
connect_now=True)

# Check we work with the second server
resp = con.call('srv_id')
self.assertIs(resp.data and resp.data[0] == 2, True)

# Stop servers
self.srv.stop()
self.srv2.stop()

# Close the connection
con.close()

@classmethod
def tearDownClass(self):
self.srv.clean()

0 comments on commit 746a084

Please sign in to comment.