Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add post_launch phase for Elasticsearch plugins #481

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,35 @@ def wait_for_rest_layer(es, max_attempts=20):


class ClusterLauncher:
def __init__(self, cfg, metrics_store, client_factory_class=client.EsClientFactory):
"""
The cluster launcher performs cluster-wide tasks that need to be done in the startup / shutdown phase.

"""
def __init__(self, cfg, metrics_store, on_post_launch=None, client_factory_class=client.EsClientFactory):
"""

Creates a new ClusterLauncher.

:param cfg: The config object.
:param metrics_store: A metrics store that is configured to receive system metrics.
:param on_post_launch: An optional function that takes the Elasticsearch client as a parameter. It is invoked after the
REST API is available.
:param client_factory_class: A factory class that can create an Elasticsearch client.
"""
self.cfg = cfg
self.metrics_store = metrics_store
self.on_post_launch = on_post_launch
self.client_factory = client_factory_class

def start(self):
"""
Performs final startup tasks.

Precondition: All cluster nodes have been started.
Postcondition: The cluster is ready to receive HTTP requests or a ``LaunchError`` is raised.

:return: A representation of the launched cluster.
"""
enabled_devices = self.cfg.opts("mechanic", "telemetry.devices")
telemetry_params = self.cfg.opts("mechanic", "telemetry.params")
hosts = self.cfg.opts("client", "hosts")
Expand Down Expand Up @@ -64,10 +87,16 @@ def start(self):
logger.error("REST API layer is not yet available. Forcefully terminating cluster.")
self.stop(c)
raise exceptions.LaunchError("Elasticsearch REST API layer is not available. Forcefully terminated cluster.")

if self.on_post_launch:
self.on_post_launch(es)
return c

def stop(self, c):
"""
Performs cleanup tasks. This method should be called before nodes are shut down.

:param c: The cluster that is about to be stopped.
"""
c.telemetry.detach_from_cluster(c)


Expand Down
35 changes: 29 additions & 6 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def __init__(self):
self.race_control = None
self.cluster_launcher = None
self.cluster = None
self.plugins = None

def receiveUnrecognizedMessage(self, msg, sender):
logger.info("MechanicActor#receiveMessage unrecognized(msg = [%s] sender = [%s])" % (str(type(msg)), str(sender)))
Expand Down Expand Up @@ -256,6 +257,7 @@ def receiveMsg_StartEngine(self, msg, sender):
cls = metrics.metrics_store_class(self.cfg)
self.metrics_store = cls(self.cfg)
self.metrics_store.open(ctx=msg.open_metrics_context)
_, self.plugins = load_team(self.cfg, msg.external)

# In our startup procedure we first create all mechanics. Only if this succeeds we'll continue.
hosts = self.cfg.opts("client", "hosts")
Expand Down Expand Up @@ -345,7 +347,8 @@ def receiveMsg_NodesStopped(self, msg, sender):
self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped)

def on_all_nodes_started(self):
self.cluster_launcher = launcher.ClusterLauncher(self.cfg, self.metrics_store)
plugin_handler = PostLaunchPluginHandler(self.plugins)
self.cluster_launcher = launcher.ClusterLauncher(self.cfg, self.metrics_store, on_post_launch=plugin_handler)
# Workaround because we could raise a LaunchError here and thespian will attempt to retry a failed message.
# In that case, we will get a followup RallyAssertionError because on the second attempt, Rally will check
# the status which is now "nodes_started" but we expected the status to be "nodes_starting" previously.
Expand Down Expand Up @@ -392,6 +395,21 @@ def on_all_nodes_stopped(self):
# do not self-terminate, let the parent actor handle this


class PostLaunchPluginHandler:
def __init__(self, plugins, hook_handler_class=team.PluginBootstrapHookHandler):
self.handlers = []
if plugins:
for plugin in plugins:
handler = hook_handler_class(plugin)
if handler.can_load():
handler.load()
self.handlers.append(handler)

def __call__(self, client):
for handler in self.handlers:
handler.invoke(team.PluginBootstrapPhase.post_launch.name, client=client)


@thespian.actors.requireCapability('coordinator')
class Dispatcher(thespian.actors.ActorTypeDispatcher):
def __init__(self):
Expand Down Expand Up @@ -590,11 +608,7 @@ def receiveUnrecognizedMessage(self, msg, sender):
# Internal API (only used by the actor and for tests)
#####################################################

def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=False, build=False, distribution=False, external=False,
docker=False):
races_root = paths.races_root(cfg)
challenge_root_path = paths.race_root(cfg)
node_ids = cfg.opts("provisioning", "node.ids", mandatory=False)
def load_team(cfg, external):
# externally provisioned clusters do not support cars / plugins
if external:
car = None
Expand All @@ -603,6 +617,15 @@ def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=Fals
team_path = team.team_path(cfg)
car = team.load_car(team_path, cfg.opts("mechanic", "car.names"), cfg.opts("mechanic", "car.params"))
plugins = team.load_plugins(team_path, cfg.opts("mechanic", "car.plugins"), cfg.opts("mechanic", "plugin.params"))
return car, plugins


def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=False, build=False, distribution=False, external=False,
docker=False):
races_root = paths.races_root(cfg)
challenge_root_path = paths.race_root(cfg)
node_ids = cfg.opts("provisioning", "node.ids", mandatory=False)
car, plugins = load_team(cfg, external)

if sources or distribution:
s = supplier.create(cfg, sources, distribution, build, challenge_root_path, plugins)
Expand Down
69 changes: 5 additions & 64 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import glob
import shutil
import logging
from enum import Enum

import jinja2

from esrally import exceptions
from esrally.utils import io, console, process, modules, versions
from esrally.mechanic import team
from esrally.utils import io, console, process, versions

logger = logging.getLogger("rally.provisioner")

Expand Down Expand Up @@ -102,21 +102,6 @@ def cleanup(preserve, install_dir, data_paths):
logger.exception("Could not delete [%s]. Skipping..." % install_dir)


class ProvisioningPhase(Enum):
post_install = 10

@classmethod
def valid(cls, name):
for n in ProvisioningPhase.names():
if n == name:
return True
return False

@classmethod
def names(cls):
return [p.name for p in list(ProvisioningPhase)]


def _apply_config(source_root_path, target_root_path, config_vars):
for root, dirs, files in os.walk(source_root_path):
env = jinja2.Environment(loader=jinja2.FileSystemLoader(root))
Expand Down Expand Up @@ -173,7 +158,7 @@ def prepare(self, binary):

for installer in self.plugin_installers:
# Never let install hooks modify our original provisioner variables and just provide a copy!
installer.invoke_install_hook(ProvisioningPhase.post_install, provisioner_vars.copy())
installer.invoke_install_hook(team.PluginBootstrapPhase.post_install, provisioner_vars.copy())

return NodeConfiguration(self.es_installer.car, self.es_installer.node_ip, self.es_installer.node_name,
self.es_installer.node_root_dir, self.es_installer.es_home_path, self.es_installer.node_log_dir,
Expand Down Expand Up @@ -297,52 +282,8 @@ def _data_paths(self):
return [os.path.join(self.es_home_path, "data")]


class InstallHookHandler:
def __init__(self, plugin, loader_class=modules.ComponentLoader):
self.plugin = plugin
# Don't allow the loader to recurse. The subdirectories may contain Elasticsearch specific files which we do not want to add to
# Rally's Python load path. We may need to define a more advanced strategy in the future.
self.loader = loader_class(root_path=self.plugin.root_path, component_entry_point="plugin", recurse=False)
self.hooks = {}

def can_load(self):
return self.loader.can_load()

def load(self):
root_module = self.loader.load()
try:
# every module needs to have a register() method
root_module.register(self)
except exceptions.RallyError:
# just pass our own exceptions transparently.
raise
except BaseException:
msg = "Could not load install hooks in [%s]" % self.loader.root_path
logger.exception(msg)
raise exceptions.SystemSetupError(msg)

def register(self, phase, hook):
logger.info("Registering install hook [%s] for phase [%s] in plugin [%s]" % (hook.__name__, phase, self.plugin.name))
if not ProvisioningPhase.valid(phase):
raise exceptions.SystemSetupError("Provisioning phase [%s] is unknown. Valid phases are: %s." %
(phase, ProvisioningPhase.names()))
if phase not in self.hooks:
self.hooks[phase] = []
self.hooks[phase].append(hook)

def invoke(self, phase, variables):
if phase in self.hooks:
logger.info("Invoking phase [%s] for plugin [%s] in config [%s]" % (phase, self.plugin.name, self.plugin.config))
for hook in self.hooks[phase]:
logger.info("Invoking hook [%s]." % hook.__name__)
# hooks should only take keyword arguments to be forwards compatible with Rally!
hook(config_names=self.plugin.config, variables=variables)
else:
logger.debug("Plugin [%s] in config [%s] has no hook registered for phase [%s]." % (self.plugin.name, self.plugin.config, phase))


class PluginInstaller:
def __init__(self, plugin, hook_handler_class=InstallHookHandler):
def __init__(self, plugin, hook_handler_class=team.PluginBootstrapHookHandler):
self.plugin = plugin
self.hook_handler = hook_handler_class(self.plugin)
if self.hook_handler.can_load():
Expand Down Expand Up @@ -371,7 +312,7 @@ def install(self, es_home_path, plugin_url=None):
(self.plugin_name, str(return_code)))

def invoke_install_hook(self, phase, variables):
self.hook_handler.invoke(phase.name, variables)
self.hook_handler.invoke(phase.name, variables=variables)

@property
def variables(self):
Expand Down
62 changes: 61 additions & 1 deletion esrally/mechanic/team.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os
import logging
import configparser
from enum import Enum

import tabulate

from esrally import exceptions, PROGRAM_NAME
from esrally.utils import console, repo, io
from esrally.utils import console, repo, io, modules

logger = logging.getLogger("rally.team")

Expand Down Expand Up @@ -341,3 +342,62 @@ def __hash__(self):

def __eq__(self, other):
return isinstance(other, type(self)) and (self.name, self.config, self.core_plugin) == (other.name, other.config, other.core_plugin)


class PluginBootstrapPhase(Enum):
post_install = 10
post_launch = 20

@classmethod
def valid(cls, name):
for n in PluginBootstrapPhase.names():
if n == name:
return True
return False

@classmethod
def names(cls):
return [p.name for p in list(PluginBootstrapPhase)]


class PluginBootstrapHookHandler:
def __init__(self, plugin, loader_class=modules.ComponentLoader):
self.plugin = plugin
# Don't allow the loader to recurse. The subdirectories may contain Elasticsearch specific files which we do not want to add to
# Rally's Python load path. We may need to define a more advanced strategy in the future.
self.loader = loader_class(root_path=self.plugin.root_path, component_entry_point="plugin", recurse=False)
self.hooks = {}

def can_load(self):
return self.loader.can_load()

def load(self):
root_module = self.loader.load()
try:
# every module needs to have a register() method
root_module.register(self)
except exceptions.RallyError:
# just pass our own exceptions transparently.
raise
except BaseException:
msg = "Could not load plugin bootstrap hooks in [{}]".format(self.loader.root_path)
logger.exception(msg)
raise exceptions.SystemSetupError(msg)

def register(self, phase, hook):
logger.info("Registering plugin bootstrap hook [%s] for phase [%s] in plugin [%s]", hook.__name__, phase, self.plugin.name)
if not PluginBootstrapPhase.valid(phase):
raise exceptions.SystemSetupError("Phase [{}] is unknown. Valid phases are: {}.".format(phase, PluginBootstrapPhase.names()))
if phase not in self.hooks:
self.hooks[phase] = []
self.hooks[phase].append(hook)

def invoke(self, phase, **kwargs):
if phase in self.hooks:
logger.info("Invoking phase [%s] for plugin [%s] in config [%s]", phase, self.plugin.name, self.plugin.config)
for hook in self.hooks[phase]:
logger.info("Invoking hook [%s].", hook.__name__)
# hooks should only take keyword arguments to be forwards compatible with Rally!
hook(config_names=self.plugin.config, **kwargs)
else:
logger.debug("Plugin [%s] in config [%s] has no hook registered for phase [%s].", self.plugin.name, self.plugin.config, phase)
Loading