diff --git a/esrally/actor.py b/esrally/actor.py index 40aa7b3a9..720dc5077 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -25,6 +25,13 @@ def __init__(self, message, cause=None): self.cause = cause +class BenchmarkCancelled: + """ + Indicates that the benchmark has been cancelled (by the user). + """ + pass + + class RallyActor(thespian.actors.Actor): def __init__(self): super().__init__() diff --git a/esrally/driver/__init__.py b/esrally/driver/__init__.py index 89f4aad7c..98605248d 100644 --- a/esrally/driver/__init__.py +++ b/esrally/driver/__init__.py @@ -1,2 +1,2 @@ # expose only the minimum API -from .driver import DriverActor, StartBenchmark, BenchmarkComplete, BenchmarkCancelled +from .driver import DriverActor, StartBenchmark, BenchmarkComplete diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 6d2ff1b86..ee67fa6cc 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -105,13 +105,6 @@ def __init__(self, metrics): self.metrics = metrics -class BenchmarkCancelled: - """ - Indicates that the benchmark has been cancelled (by the user). - """ - pass - - class DriverActor(actor.RallyActor): WAKEUP_INTERVAL_SECONDS = 1 """ @@ -142,7 +135,7 @@ def receiveMessage(self, msg, sender): logger.error("Main driver received a fatal exception from a load generator. Shutting down.") self.coordinator.close() self.send(self.start_sender, msg) - elif isinstance(msg, BenchmarkCancelled): + elif isinstance(msg, actor.BenchmarkCancelled): logger.info("Main driver received a notification that the benchmark has been cancelled.") self.coordinator.close() self.send(self.start_sender, msg) @@ -491,7 +484,7 @@ def receiveMessage(self, msg, sender): if self.cancel.is_set(): logger.info("LoadGenerator[%s] has detected that benchmark has been cancelled. Notifying master..." % str(self.client_id)) - self.send(self.master, BenchmarkCancelled()) + self.send(self.master, actor.BenchmarkCancelled()) elif self.executor_future is not None and self.executor_future.done(): e = self.executor_future.exception(timeout=0) if e: diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py index f47dbfae8..e9837c1f3 100644 --- a/esrally/racecontrol.py +++ b/esrally/racecontrol.py @@ -3,6 +3,8 @@ import sys import tabulate +import thespian.actors + from esrally import actor, config, exceptions, track, driver, mechanic, reporter, metrics, time, PROGRAM_NAME from esrally.utils import console, convert @@ -61,21 +63,126 @@ def __call__(self, cfg): self.target(cfg) -class Benchmark: +class Setup: def __init__(self, cfg, sources=False, build=False, distribution=False, external=False, docker=False): self.cfg = cfg + self.sources = sources + self.build = build + self.distribution = distribution + self.external = external + self.docker = docker + + +class Success: + pass + + +class BenchmarkActor(actor.RallyActor): + def __init__(self): + super().__init__() + actor.RallyActor.configure_logging(logger) + self.cfg = None + self.race = None + self.metrics_store = None + self.race_store = None + self.lap_counter = None + self.cancelled = False + self.error = False + self.start_sender = None + self.mechanic = None + self.main_driver = None + + def receiveMessage(self, msg, sender): + try: + logger.debug("BenchmarkActor#receiveMessage(msg = [%s] sender = [%s])" % (str(type(msg)), str(sender))) + if isinstance(msg, Setup): + self.start_sender = sender + self.setup(msg) + elif isinstance(msg, mechanic.EngineStarted): + logger.info("Mechanic has started engine successfully.") + self.metrics_store.meta_info = msg.system_meta_info + cluster = msg.cluster_meta_info + self.race.cluster = cluster + console.info("Racing on track [%s], challenge [%s] and car [%s]\n" + % (self.race.track_name, self.race.challenge_name, self.race.car)) + # start running we assume that each race has at least one lap + self.run() + elif isinstance(msg, actor.BenchmarkCancelled): + self.cancelled = True + # no need to tell the obvious + if sender != self.start_sender: + self.send(self.start_sender, msg) + elif isinstance(msg, actor.BenchmarkFailure): + self.error = True + self.send(self.start_sender, msg) + elif isinstance(msg, driver.BenchmarkComplete): + logger.info("Benchmark is complete.") + logger.info("Bulk adding request metrics to metrics store.") + self.metrics_store.bulk_add(msg.metrics) + self.send(self.main_driver, thespian.actors.ActorExitRequest()) + self.main_driver = None + self.send(self.mechanic, mechanic.OnBenchmarkStop()) + elif isinstance(msg, mechanic.BenchmarkStopped): + logger.info("Bulk adding system metrics to metrics store.") + self.metrics_store.bulk_add(msg.system_metrics) + logger.info("Flushing metrics data...") + self.metrics_store.flush() + logger.info("Flushing done") + self.lap_counter.after_lap() + if self.lap_counter.has_more_laps(): + self.run() + else: + self.teardown() + elif isinstance(msg, mechanic.EngineStopped): + logger.info("Mechanic has stopped engine successfully.") + logger.info("Bulk adding system metrics to metrics store.") + self.metrics_store.bulk_add(msg.system_metrics) + self.metrics_store.flush() + if not self.cancelled and not self.error: + final_results = reporter.calculate_results(self.metrics_store, self.race) + self.race.add_final_results(final_results) + reporter.summarize(self.race, self.cfg) + self.race_store.store_race(self.race) + else: + logger.info("Suppressing output of summary report. Cancelled = [%r], Error = [%r]." % (self.cancelled, self.error)) + self.metrics_store.close() + self.send(self.start_sender, Success()) + elif isinstance(msg, thespian.actors.ActorExitRequest): + if self.mechanic: + self.send(self.mechanic, msg) + self.mechanic = None + if self.main_driver: + self.send(self.main_driver, msg) + self.main_driver = None + else: + logger.info("BenchmarkActor received unknown message [%s] (ignoring)." % (str(msg))) + except BaseException as e: + self.error = True + logger.exception("BenchmarkActor encountered a fatal exception. Shutting down.") + self.send(self.start_sender, actor.BenchmarkFailure("Could not execute benchmark", e)) + + def setup(self, msg): + self.mechanic = self.createActor(mechanic.MechanicActor, + targetActorRequirements={"coordinator": True}, + globalName="/rally/mechanic/coordinator") + + self.cfg = msg.cfg # to load the track we need to know the correct cluster distribution version. Usually, this value should be set but there are rare # cases (external pipeline and user did not specify the distribution version) where we need to derive it ourselves. For source # builds we always assume "master" - if not sources and not self.cfg.exists("mechanic", "distribution.version"): + if not msg.sources and not self.cfg.exists("mechanic", "distribution.version"): distribution_version = mechanic.cluster_distribution_version(self.cfg) if not distribution_version: raise exceptions.SystemSetupError("A distribution version is required. Please specify it with --distribution-version.") logger.info("Automatically derived distribution version [%s]" % distribution_version) self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.version", distribution_version) - t = self._load_track() - challenge = self._find_challenge(t) + t = track.load_track(self.cfg) + challenge_name = self.cfg.opts("track", "challenge.name") + challenge = t.find_challenge_or_default(challenge_name) + if challenge is None: + raise exceptions.SystemSetupError("Track [%s] does not provide challenge [%s]. List the available tracks with %s list tracks." + % (t.name, challenge_name, PROGRAM_NAME)) if challenge.user_info: console.info(challenge.user_info, logger=logger) self.race = metrics.create_race(self.cfg, t, challenge) @@ -86,127 +193,28 @@ def __init__(self, cfg, sources=False, build=False, distribution=False, external challenge=self.race.challenge_name, read_only=False ) + self.lap_counter = LapCounter(self.race, self.metrics_store, self.cfg) self.race_store = metrics.race_store(self.cfg) - self.sources = sources - self.build = build - self.distribution = distribution - self.external = external - self.docker = docker - self.actor_system = None - self.mechanic = None - - def _load_track(self): - return track.load_track(self.cfg) - - def _find_challenge(self, t): - challenge_name = self.cfg.opts("track", "challenge.name") - challenge = t.find_challenge_or_default(challenge_name) - if challenge is None: - raise exceptions.SystemSetupError("Track [%s] does not provide challenge [%s]. List the available tracks with %s list tracks." - % (t.name, challenge_name, PROGRAM_NAME)) - return challenge - - def setup(self): - # at this point an actor system has to run and we should only join - self.actor_system = actor.bootstrap_actor_system(try_join=True) - self.mechanic = self.actor_system.createActor(mechanic.MechanicActor, - targetActorRequirements={"coordinator": True}, - globalName="/rally/mechanic/coordinator") logger.info("Asking mechanic to start the engine.") cluster_settings = self.race.challenge.cluster_settings - result = self.actor_system.ask(self.mechanic, - mechanic.StartEngine( - self.cfg, self.metrics_store.open_context, cluster_settings, - self.sources, self.build, self.distribution, self.external, self.docker)) - if isinstance(result, mechanic.EngineStarted): - logger.info("Mechanic has started engine successfully.") - self.metrics_store.meta_info = result.system_meta_info - cluster = result.cluster_meta_info - self.race.cluster = cluster - console.info("Racing on track [%s], challenge [%s] and car [%s]\n" - % (self.race.track_name, self.race.challenge_name, self.race.car)) - elif isinstance(result, actor.BenchmarkFailure): - logger.info("Starting engine has failed. Reason [%s]." % result.message) - raise exceptions.RallyError(result.message) - else: - raise exceptions.RallyError("Mechanic has not started engine but instead [%s]. Terminating race without result." % str(result)) + self.send(self.mechanic, mechanic.StartEngine(self.cfg, self.metrics_store.open_context, cluster_settings, msg.sources, msg.build, + msg.distribution, msg.external, msg.docker)) - def run(self, lap): - """ - Runs the provided lap of a benchmark. - - :param lap: The current lap number. - :return: True iff the benchmark may go on. False iff the user has cancelled the benchmark. - """ + def run(self): + self.lap_counter.before_lap() + lap = self.lap_counter.current_lap self.metrics_store.lap = lap - logger.info("Notifying mechanic of benchmark start.") - # we could use #tell() here but then the ask call to driver below will fail because it returns the response that mechanic - # sends (see http://godaddy.github.io/Thespian/doc/using.html#sec-6-6-1). - self.actor_system.ask(self.mechanic, mechanic.OnBenchmarkStart(lap)) - logger.info("Asking driver to start benchmark.") - main_driver = self.actor_system.createActor(driver.DriverActor, - targetActorRequirements={"coordinator": True}, - globalName="/rally/driver/coordinator") - try: - result = self.actor_system.ask(main_driver, - driver.StartBenchmark(self.cfg, self.race.track, self.metrics_store.meta_info, lap)) - except KeyboardInterrupt: - logger.info("User has cancelled the benchmark.") - self.actor_system.send(main_driver, driver.BenchmarkCancelled()) - return False - finally: - logger.info("Race control has received a benchmark result message. Terminating main driver actor.") - import thespian.actors - self.actor_system.tell(main_driver, thespian.actors.ActorExitRequest()) - - if isinstance(result, driver.BenchmarkComplete): - logger.info("Benchmark is complete.") - logger.info("Bulk adding request metrics to metrics store.") - self.metrics_store.bulk_add(result.metrics) - stop_result = self.actor_system.ask(self.mechanic, mechanic.OnBenchmarkStop()) - if isinstance(stop_result, mechanic.BenchmarkStopped): - logger.info("Bulk adding system metrics to metrics store.") - self.metrics_store.bulk_add(stop_result.system_metrics) - else: - raise exceptions.RallyError("Mechanic has returned no metrics but instead [%s]. Terminating race without result." % - str(stop_result)) - - logger.info("Flushing metrics data...") - self.metrics_store.flush() - logger.info("Flushing done") - # may happen if one of the load generators has detected that the user has cancelled the benchmark. - elif isinstance(result, driver.BenchmarkCancelled): - logger.info("User has cancelled the benchmark.") - return False - elif isinstance(result, actor.BenchmarkFailure): - logger.info("Driver has reported a benchmark failure.") - raise exceptions.RallyError(result.message, result.cause) - else: - raise exceptions.RallyError("Driver has returned no metrics but instead [%s]. Terminating race without result." % str(result)) - return True - - def teardown(self, cancelled=False, error=False): + logger.info("Telling mechanic of benchmark start.") + self.send(self.mechanic, mechanic.OnBenchmarkStart(lap)) + self.main_driver = self.createActor(driver.DriverActor, + targetActorRequirements={"coordinator": True}, + globalName="/rally/driver/coordinator") + logger.info("Telling driver to start benchmark.") + self.send(self.main_driver, driver.StartBenchmark(self.cfg, self.race.track, self.metrics_store.meta_info, lap)) + + def teardown(self): logger.info("Asking mechanic to stop the engine.") - result = self.actor_system.ask(self.mechanic, mechanic.StopEngine()) - if isinstance(result, mechanic.EngineStopped): - logger.info("Mechanic has stopped engine successfully.") - logger.info("Bulk adding system metrics to metrics store.") - self.metrics_store.bulk_add(result.system_metrics) - elif isinstance(result, actor.BenchmarkFailure): - logger.info("Stopping engine has failed. Reason [%s]." % result.message) - raise exceptions.RallyError(result.message, result.cause) - else: - raise exceptions.RallyError("Mechanic has not stopped engine but instead [%s]. Terminating race without result." % str(result)) - - self.metrics_store.flush() - if not cancelled and not error: - final_results = reporter.calculate_results(self.metrics_store, self.race) - self.race.add_final_results(final_results) - reporter.summarize(self.race, self.cfg) - self.race_store.store_race(self.race) - else: - logger.info("Suppressing output of summary report. Cancelled = [%r], Error = [%r]." % (cancelled, error)) - self.metrics_store.close() + self.send(self.mechanic, mechanic.StopEngine()) class LapCounter: @@ -217,26 +225,31 @@ def __init__(self, current_race, metrics_store, cfg): self.lap_timer = time.Clock.stop_watch() self.lap_timer.start() self.lap_times = 0 + self.current_lap = 0 - def before_lap(self, lap): - logger.info("Starting lap [%d/%d]" % (lap, self.race.total_laps)) + def has_more_laps(self): + return self.current_lap < self.race.total_laps + + def before_lap(self): + self.current_lap += 1 + logger.info("Starting lap [%d/%d]" % (self.current_lap, self.race.total_laps)) if self.race.total_laps > 1: - msg = "Lap [%d/%d]" % (lap, self.race.total_laps) + msg = "Lap [%d/%d]" % (self.current_lap, self.race.total_laps) console.println(console.format.bold(msg)) console.println(console.format.underline_for(msg)) - def after_lap(self, lap): - logger.info("Finished lap [%d/%d]" % (lap, self.race.total_laps)) + def after_lap(self): + logger.info("Finished lap [%d/%d]" % (self.current_lap, self.race.total_laps)) if self.race.total_laps > 1: lap_time = self.lap_timer.split_time() - self.lap_times self.lap_times += lap_time hl, ml, sl = convert.seconds_to_hour_minute_seconds(lap_time) - lap_results = reporter.calculate_results(self.metrics_store, self.race, lap) + lap_results = reporter.calculate_results(self.metrics_store, self.race, self.current_lap) self.race.add_lap_results(lap_results) - reporter.summarize(self.race, self.cfg, lap=lap) + reporter.summarize(self.race, self.cfg, lap=self.current_lap) console.println("") - if lap < self.race.total_laps: - remaining = (self.race.total_laps - lap) * self.lap_times / lap + if self.current_lap < self.race.total_laps: + remaining = (self.race.total_laps - self.current_lap) * self.lap_times / self.current_lap hr, mr, sr = convert.seconds_to_hour_minute_seconds(remaining) console.info("Lap time %02d:%02d:%02d (ETA: %02d:%02d:%02d)" % (hl, ml, sl, hr, mr, sr), logger=logger) else: @@ -244,26 +257,30 @@ def after_lap(self, lap): console.println("") -def race(benchmark): - cfg = benchmark.cfg - laps = benchmark.race.total_laps - benchmark.setup() - lap_counter = LapCounter(benchmark.race, benchmark.metrics_store, cfg) - cancelled = False - error = True +def race(cfg, sources=False, build=False, distribution=False, external=False, docker=False): + # at this point an actor system has to run and we should only join + actor_system = actor.bootstrap_actor_system(try_join=True) + benchmark_actor = actor_system.createActor(BenchmarkActor, targetActorRequirements={"coordinator": True}) try: - for lap in range(1, laps + 1): - lap_counter.before_lap(lap) - may_continue = benchmark.run(lap) - if may_continue: - lap_counter.after_lap(lap) - else: - cancelled = True - # Early termination due to cancellation by the user - break - error = False + result = actor_system.ask(benchmark_actor, Setup(cfg, sources, build, distribution, external, docker)) + if isinstance(result, Success): + logger.info("Benchmark has finished successfully.") + # may happen if one of the load generators has detected that the user has cancelled the benchmark. + elif isinstance(result, actor.BenchmarkCancelled): + logger.info("User has cancelled the benchmark.") + elif isinstance(result, actor.BenchmarkFailure): + logger.error("A benchmark failure has occurred") + raise exceptions.RallyError(result.message, result.cause) + else: + raise exceptions.RallyError("Got an unexpected result during benchmarking: [%s]." % str(result)) + except KeyboardInterrupt: + logger.info("User has cancelled the benchmark.") + # notify the coordinator so it can properly handle this state. Do it blocking so we don't have a race between this message + # and the actor exit request. + actor_system.ask(benchmark_actor, actor.BenchmarkCancelled()) finally: - benchmark.teardown(cancelled, error) + logger.info("Telling benchmark actor to exit.") + actor_system.tell(benchmark_actor, thespian.actors.ActorExitRequest()) def set_default_hosts(cfg, host="127.0.0.1", port=9200): @@ -279,19 +296,19 @@ def set_default_hosts(cfg, host="127.0.0.1", port=9200): def from_sources_complete(cfg): port = cfg.opts("provisioning", "node.http.port") set_default_hosts(cfg, port=port) - return race(Benchmark(cfg, sources=True, build=True)) + return race(cfg, sources=True, build=True) def from_sources_skip_build(cfg): port = cfg.opts("provisioning", "node.http.port") set_default_hosts(cfg, port=port) - return race(Benchmark(cfg, sources=True, build=False)) + return race(cfg, sources=True, build=False) def from_distribution(cfg): port = cfg.opts("provisioning", "node.http.port") set_default_hosts(cfg, port=port) - return race(Benchmark(cfg, distribution=True)) + return race(cfg, distribution=True) def benchmark_only(cfg): @@ -299,12 +316,12 @@ def benchmark_only(cfg): set_default_hosts(cfg) # We'll use a special car name for external benchmarks. cfg.add(config.Scope.benchmark, "mechanic", "car.name", "external") - return race(Benchmark(cfg, external=True)) + return race(cfg, external=True) def docker(cfg): set_default_hosts(cfg) - return race(Benchmark(cfg, docker=True)) + return race(cfg, docker=True) Pipeline("from-sources-complete", diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 2f68a927d..683d94acf 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -73,7 +73,7 @@ def load_track(cfg): included_tasks = cfg.opts("track", "include.tasks") current_track = reader.read(track_name, track_file(repo, track_name), track_dir(repo, track_name), - os.path.join(data_root, track_name.lower())) + os.path.join(data_root, track_name.lower())) current_track = filter_included_tasks(current_track, filters_from_included_tasks(included_tasks)) if cfg.opts("track", "test.mode.enabled"):