Skip to content

Commit

Permalink
Allow multiple nodes per IP
Browse files Browse the repository at this point in the history
With this commit we expose a new provisioner variable `all_node_names`
which consists of all Elasticsearch node names in the cluster. This can
be used to set `cluster.initial_master_nodes` in order to ensure the
discovery process takes all nodes into account even if they are started
on the same host. Previously we have used the IP address for this
purpose but this is ambiguous when multiple nodes are started on the
same host.

Relates #804
  • Loading branch information
danielmitterdorfer authored Oct 28, 2019
1 parent cf0d16f commit 0e72e34
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
29 changes: 20 additions & 9 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build,
self.port = port
self.node_id = node_id

def for_nodes(self, all_node_ips=None, ip=None, port=None, node_ids=None):
def for_nodes(self, all_node_ips=None, all_node_ids=None, ip=None, port=None, node_ids=None):
"""
Creates a StartNodes instance for a concrete IP, port and their associated node_ids.
:param all_node_ips: The IPs of all nodes in the cluster (including the current one).
:param all_node_ids: The numeric id of all nodes in the cluster (including the current one).
:param ip: The IP to set.
:param port: The port number to set.
:param node_ids: A list of node id to set.
:return: A corresponding ``StartNodes`` message with the specified IP, port number and node ids.
"""
return StartNodes(self.cfg, self.open_metrics_context, self.cluster_settings, self.sources, self.build, self.distribution,
self.external, self.docker, all_node_ips, ip, port, node_ids)
self.external, self.docker, all_node_ips, all_node_ids, ip, port, node_ids)


class EngineStarted:
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(self, reset_in_seconds):

class StartNodes:
def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build, distribution, external, docker,
all_node_ips, ip, port, node_ids):
all_node_ips, all_node_ids, ip, port, node_ids):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.cluster_settings = cluster_settings
Expand All @@ -109,6 +110,7 @@ def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build,
self.external = external
self.docker = docker
self.all_node_ips = all_node_ips
self.all_node_ids = all_node_ids
self.ip = ip
self.port = port
self.node_ids = node_ids
Expand Down Expand Up @@ -175,6 +177,13 @@ def extract_all_node_ips(ip_port_pairs):
return all_node_ips


def extract_all_node_ids(all_nodes_by_host):
all_node_ids = set()
for node_ids_per_host in all_nodes_by_host.values():
all_node_ids.update(node_ids_per_host)
return all_node_ids


def nodes_by_host(ip_port_pairs):
nodes = {}
node_id = 0
Expand Down Expand Up @@ -361,9 +370,11 @@ def receiveMsg_StartEngine(self, startmsg, sender):
self.remotes = defaultdict(list)
all_ips_and_ports = to_ip_port(startmsg.hosts)
all_node_ips = extract_all_node_ips(all_ips_and_ports)
all_nodes_by_host = nodes_by_host(all_ips_and_ports)
all_node_ids = extract_all_node_ids(all_nodes_by_host)

for (ip, port), node in nodes_by_host(all_ips_and_ports).items():
submsg = startmsg.for_nodes(all_node_ips, ip, port, node)
for (ip, port), node in all_nodes_by_host.items():
submsg = startmsg.for_nodes(all_node_ips, all_node_ids, ip, port, node)
submsg.reply_to = sender
if '127.0.0.1' == ip:
m = self.createActor(NodeMechanicActor,
Expand Down Expand Up @@ -461,8 +472,8 @@ def receiveMsg_StartNodes(self, msg, sender):
self.metrics_store.open(ctx=msg.open_metrics_context)
# avoid follow-up errors in case we receive an unexpected ActorExitRequest due to an early failure in a parent actor.

self.mechanic = create(self.config, self.metrics_store, msg.all_node_ips, msg.cluster_settings, msg.sources, msg.build,
msg.distribution, msg.external, msg.docker)
self.mechanic = create(self.config, self.metrics_store, msg.all_node_ips, msg.all_node_ids, msg.cluster_settings,
msg.sources, msg.build, msg.distribution, msg.external, msg.docker)
nodes = self.mechanic.start_engine()
self.running = True
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
Expand Down Expand Up @@ -535,7 +546,7 @@ def load_team(cfg, external):
return car, plugins


def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=False, build=False, distribution=False, external=False,
def create(cfg, metrics_store, all_node_ips, all_node_ids, cluster_settings=None, sources=False, build=False, distribution=False, external=False,
docker=False):
races_root = paths.races_root(cfg)
challenge_root_path = paths.race_root(cfg)
Expand All @@ -546,7 +557,7 @@ def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=Fals
s = supplier.create(cfg, sources, distribution, build, challenge_root_path, car, plugins)
p = []
for node_id in node_ids:
p.append(provisioner.local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, challenge_root_path, node_id))
p.append(provisioner.local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, all_node_ids, challenge_root_path, node_id))
l = launcher.ProcessLauncher(cfg, metrics_store, races_root)
elif external:
raise exceptions.RallyAssertionError("Externally provisioned clusters should not need to be managed by Rally's mechanic")
Expand Down
9 changes: 6 additions & 3 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from esrally.utils import console, io, process, versions


def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id):
def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, all_node_ids, target_root, node_id):
distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
ip = cfg.opts("provisioning", "node.ip")
http_port = cfg.opts("provisioning", "node.http.port")
Expand All @@ -36,10 +36,11 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_

node_name = "%s-%d" % (node_name_prefix, node_id)
node_root_dir = "%s/%s" % (target_root, node_name)
all_node_names = ["%s-%d" % (node_name_prefix, n) for n in all_node_ids]

_, java_home = java_resolver.java_home(car, cfg)

es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port)
es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, all_node_names, ip, http_port)
plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins]

return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version)
Expand Down Expand Up @@ -219,7 +220,7 @@ def _provisioner_variables(self):


class ElasticsearchInstaller:
def __init__(self, car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port, hook_handler_class=team.BootstrapHookHandler):
def __init__(self, car, java_home, node_name, node_root_dir, all_node_ips, all_node_names, ip, http_port, hook_handler_class=team.BootstrapHookHandler):
self.car = car
self.java_home = java_home
self.node_name = node_name
Expand All @@ -228,6 +229,7 @@ def __init__(self, car, java_home, node_name, node_root_dir, all_node_ips, ip, h
self.node_log_dir = "%s/logs/server" % node_root_dir
self.heap_dump_dir = "%s/heapdump" % node_root_dir
self.all_node_ips = all_node_ips
self.all_node_names = all_node_names
self.node_ip = ip
self.http_port = http_port
self.hook_handler = hook_handler_class(self.car)
Expand Down Expand Up @@ -278,6 +280,7 @@ def variables(self):
"http_port": "%d-%d" % (self.http_port, self.http_port + 100),
"transport_port": "%d-%d" % (self.http_port + 100, self.http_port + 200),
"all_node_ips": "[\"%s\"]" % "\",\"".join(self.all_node_ips),
"all_node_names": "[\"%s\"]" % "\",\"".join(self.all_node_names),
# at the moment we are strict and enforce that all nodes are master eligible nodes
"minimum_master_nodes": len(self.all_node_ips),
"install_root_path": self.es_home_path
Expand Down
13 changes: 13 additions & 0 deletions tests/mechanic/provisioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
node_name="rally-node-0",
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200)

Expand Down Expand Up @@ -81,6 +82,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/opt/elasticsearch-5.0.0"
}, config_vars)
Expand Down Expand Up @@ -152,6 +154,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
node_name="rally-node-0",
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200)

Expand Down Expand Up @@ -195,6 +198,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/opt/elasticsearch-5.0.0",
"plugin_name": "x-pack-security",
Expand Down Expand Up @@ -229,6 +233,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
node_name="rally-node-0",
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200)

Expand Down Expand Up @@ -272,6 +277,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/opt/elasticsearch-6.3.0",
"plugin_name": "x-pack-security",
Expand Down Expand Up @@ -304,6 +310,7 @@ def test_cleanup_nothing_on_preserve(self, mock_path_exists, mock_rm):
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips={"127.0.0.1"},
all_node_names=["rally-node-0"],
ip="127.0.0.1",
http_port=9200,
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest")
Expand All @@ -324,6 +331,7 @@ def test_cleanup(self, mock_path_exists, mock_rm):
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips={"127.0.0.1"},
all_node_names=["rally-node-0"],
ip="127.0.0.1",
http_port=9200,
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest")
Expand All @@ -344,6 +352,7 @@ def test_prepare_default_data_paths(self, mock_rm, mock_ensure_dir, mock_decompr
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200,
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest")
Expand All @@ -362,6 +371,7 @@ def test_prepare_default_data_paths(self, mock_rm, mock_ensure_dir, mock_decompr
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/install/elasticsearch-5.0.0-SNAPSHOT"
}, installer.variables)
Expand All @@ -380,6 +390,7 @@ def test_prepare_user_provided_data_path(self, mock_rm, mock_ensure_dir, mock_de
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200,
node_root_dir="~/.rally/benchmarks/races/unittest")
Expand All @@ -398,6 +409,7 @@ def test_prepare_user_provided_data_path(self, mock_rm, mock_ensure_dir, mock_de
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/install/elasticsearch-5.0.0-SNAPSHOT"
}, installer.variables)
Expand All @@ -412,6 +424,7 @@ def test_invokes_hook(self):
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200,
node_root_dir="~/.rally/benchmarks/races/unittest",
Expand Down

0 comments on commit 0e72e34

Please sign in to comment.