Skip to content

Commit

Permalink
Turn benchmark coordinator into an actor
Browse files Browse the repository at this point in the history
To implement #278, the benchmark coordinator needs to be a proper actor.
This allows us to send messages between driver and mechanic via the
coordinator without complicating the control flow.

Relates #278
  • Loading branch information
danielmitterdorfer committed Aug 16, 2017
1 parent 524761a commit e519d9a
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 162 deletions.
7 changes: 7 additions & 0 deletions esrally/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ def __init__(self, message, cause=None):
self.cause = cause


class BenchmarkCancelled:
"""
Indicates that the benchmark has been cancelled (by the user).
"""
pass


class RallyActor(thespian.actors.Actor):
def __init__(self):
super().__init__()
Expand Down
2 changes: 1 addition & 1 deletion esrally/driver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# expose only the minimum API
from .driver import DriverActor, StartBenchmark, BenchmarkComplete, BenchmarkCancelled
from .driver import DriverActor, StartBenchmark, BenchmarkComplete
11 changes: 2 additions & 9 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,6 @@ def __init__(self, metrics):
self.metrics = metrics


class BenchmarkCancelled:
"""
Indicates that the benchmark has been cancelled (by the user).
"""
pass


class DriverActor(actor.RallyActor):
WAKEUP_INTERVAL_SECONDS = 1
"""
Expand Down Expand Up @@ -142,7 +135,7 @@ def receiveMessage(self, msg, sender):
logger.error("Main driver received a fatal exception from a load generator. Shutting down.")
self.coordinator.close()
self.send(self.start_sender, msg)
elif isinstance(msg, BenchmarkCancelled):
elif isinstance(msg, actor.BenchmarkCancelled):
logger.info("Main driver received a notification that the benchmark has been cancelled.")
self.coordinator.close()
self.send(self.start_sender, msg)
Expand Down Expand Up @@ -491,7 +484,7 @@ def receiveMessage(self, msg, sender):
if self.cancel.is_set():
logger.info("LoadGenerator[%s] has detected that benchmark has been cancelled. Notifying master..." %
str(self.client_id))
self.send(self.master, BenchmarkCancelled())
self.send(self.master, actor.BenchmarkCancelled())
elif self.executor_future is not None and self.executor_future.done():
e = self.executor_future.exception(timeout=0)
if e:
Expand Down
Loading

0 comments on commit e519d9a

Please sign in to comment.