Skip to content

Commit

Permalink
Remove post_launch bootstrap phase
Browse files Browse the repository at this point in the history
With this commit we remove the unused bootstrap phase `post_launch`
which was intended to provide a car/plugin bootstrap phase after
Elasticsearch has been launched.

Relates elastic#481
  • Loading branch information
danielmitterdorfer committed May 23, 2018
1 parent 94f27af commit c2e7ba3
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 50 deletions.
7 changes: 1 addition & 6 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,17 @@ class ClusterLauncher:
The cluster launcher performs cluster-wide tasks that need to be done in the startup / shutdown phase.
"""
def __init__(self, cfg, metrics_store, on_post_launch=None, client_factory_class=client.EsClientFactory):
def __init__(self, cfg, metrics_store, client_factory_class=client.EsClientFactory):
"""
Creates a new ClusterLauncher.
:param cfg: The config object.
:param metrics_store: A metrics store that is configured to receive system metrics.
:param on_post_launch: An optional function that takes the Elasticsearch client as a parameter. It is invoked after the
REST API is available.
:param client_factory_class: A factory class that can create an Elasticsearch client.
"""
self.cfg = cfg
self.metrics_store = metrics_store
self.on_post_launch = on_post_launch
self.client_factory = client_factory_class

def start(self):
Expand Down Expand Up @@ -98,8 +95,6 @@ def start(self):
logger.error("REST API layer is not yet available. Forcefully terminating cluster.")
self.stop(c)
raise exceptions.LaunchError("Elasticsearch REST API layer is not available. Forcefully terminated cluster.")
if self.on_post_launch:
self.on_post_launch(es_default)
return c

def stop(self, c):
Expand Down
19 changes: 1 addition & 18 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,7 @@ def receiveMsg_NodesStopped(self, msg, sender):
self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped)

def on_all_nodes_started(self):
# we might not have a car if we benchmark external clusters
post_launch_handler = PostLaunchHandler([self.car]) if self.car else None
self.cluster_launcher = launcher.ClusterLauncher(self.cfg, self.metrics_store, on_post_launch=post_launch_handler)
self.cluster_launcher = launcher.ClusterLauncher(self.cfg, self.metrics_store)
# Workaround because we could raise a LaunchError here and thespian will attempt to retry a failed message.
# In that case, we will get a followup RallyAssertionError because on the second attempt, Rally will check
# the status which is now "nodes_started" but we expected the status to be "nodes_starting" previously.
Expand Down Expand Up @@ -410,21 +408,6 @@ def on_all_nodes_stopped(self):
# do not self-terminate, let the parent actor handle this


class PostLaunchHandler:
def __init__(self, components, hook_handler_class=team.BootstrapHookHandler):
self.handlers = []
if components:
for component in components:
handler = hook_handler_class(component)
if handler.can_load():
handler.load()
self.handlers.append(handler)

def __call__(self, client):
for handler in self.handlers:
handler.invoke(team.BootstrapPhase.post_launch.name, client=client)


@thespian.actors.requireCapability('coordinator')
class Dispatcher(thespian.actors.ActorTypeDispatcher):
def __init__(self):
Expand Down
1 change: 0 additions & 1 deletion esrally/mechanic/team.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ def __eq__(self, other):

class BootstrapPhase(Enum):
post_install = 10
post_launch = 20

@classmethod
def valid(cls, name):
Expand Down
26 changes: 2 additions & 24 deletions tests/mechanic/launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,7 @@ class ClusterLauncherTests(TestCase):
test_host = opts.TargetHosts("10.0.0.10:9200,10.0.0.11:9200")
client_options = opts.ClientOptions('timeout:60')

def test_launches_cluster_with_post_launch_handler(self):
on_post_launch = mock.Mock()
cfg = config.Config()
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
cfg.add(config.Scope.application, "client", "options", self.client_options)
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(),
on_post_launch=on_post_launch, client_factory_class=MockClientFactory)
cluster = cluster_launcher.start()

self.assertEqual([{"host": "10.0.0.10", "port":9200}, {"host": "10.0.0.11", "port":9200}], cluster.hosts)
self.assertIsNotNone(cluster.telemetry)
# this requires at least Python 3.6
# on_post_launch.assert_called_once()
self.assertEqual(1, on_post_launch.call_count)

def test_launches_cluster_without_post_launch_handler(self):
def test_launches_cluster(self):
cfg = config.Config()
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
cfg.add(config.Scope.application, "client", "options", self.client_options)
Expand Down Expand Up @@ -161,18 +143,14 @@ def test_launches_cluster_with_telemetry_client_timeout_enabled(self):

@mock.patch("time.sleep")
def test_error_on_cluster_launch(self, sleep):
on_post_launch = mock.Mock()
cfg = config.Config()
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
# Simulate that the client will raise an error upon startup
cfg.add(config.Scope.application, "client", "options", opts.ClientOptions("raise-error-on-info:true"))
#cfg.add(config.Scope.application, "client", "options", {"raise-error-on-info": True})
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(),
on_post_launch=on_post_launch, client_factory_class=MockClientFactory)
cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
with self.assertRaisesRegex(exceptions.LaunchError,
"Elasticsearch REST API layer is not available. Forcefully terminated cluster."):
cluster_launcher.start()
self.assertEqual(0, on_post_launch.call_count)
2 changes: 1 addition & 1 deletion tests/mechanic/team_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,5 +223,5 @@ def test_cannot_register_for_unknown_phase(self):
handler.loader.registration_function = hook
with self.assertRaises(exceptions.SystemSetupError) as ctx:
handler.load()
self.assertEqual("Unknown bootstrap phase [this_is_an_unknown_install_phase]. Valid phases are: ['post_install', 'post_launch'].",
self.assertEqual("Unknown bootstrap phase [this_is_an_unknown_install_phase]. Valid phases are: ['post_install'].",
ctx.exception.args[0])

0 comments on commit c2e7ba3

Please sign in to comment.