Skip to content

Commit

Permalink
Cleanup actor system code (#1749)
Browse files Browse the repository at this point in the history
* Remove unused function
* Rename aws to awaitables

While the asyncio documentation uses variables named `aws`, I always
struggle to remember what that means in the context of Rally.

* Rename internal variables for greater clarity

Co-authored-by: Gareth Ellis <[email protected]>
  • Loading branch information
pquentin and gareth-ellis authored Aug 14, 2023
1 parent bb51612 commit 0720789
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 60 deletions.
114 changes: 55 additions & 59 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,40 +226,40 @@ class DriverActor(actor.RallyActor):

def __init__(self):
super().__init__()
self.start_sender = None
self.coordinator = None
self.benchmark_actor = None
self.driver = None
self.status = "init"
self.post_process_timer = 0
self.cluster_details = {}

def receiveMsg_PoisonMessage(self, poisonmsg, sender):
self.logger.error("Main driver received a fatal indication from a load generator (%s). Shutting down.", poisonmsg.details)
self.coordinator.close()
self.send(self.start_sender, actor.BenchmarkFailure("Fatal track or load generator indication", poisonmsg.details))
self.driver.close()
self.send(self.benchmark_actor, actor.BenchmarkFailure("Fatal track or load generator indication", poisonmsg.details))

def receiveMsg_BenchmarkFailure(self, msg, sender):
self.logger.error("Main driver received a fatal exception from a load generator. Shutting down.")
self.coordinator.close()
self.send(self.start_sender, msg)
self.driver.close()
self.send(self.benchmark_actor, msg)

def receiveMsg_BenchmarkCancelled(self, msg, sender):
self.logger.info("Main driver received a notification that the benchmark has been cancelled.")
self.coordinator.close()
self.send(self.start_sender, msg)
self.driver.close()
self.send(self.benchmark_actor, msg)

def receiveMsg_ActorExitRequest(self, msg, sender):
self.logger.info("Main driver received ActorExitRequest and will terminate all load generators.")
self.status = "exiting"

def receiveMsg_ChildActorExited(self, msg, sender):
# is it a worker?
if msg.childAddress in self.coordinator.workers:
worker_index = self.coordinator.workers.index(msg.childAddress)
if msg.childAddress in self.driver.workers:
worker_index = self.driver.workers.index(msg.childAddress)
if self.status == "exiting":
self.logger.debug("Worker [%d] has exited.", worker_index)
else:
self.logger.error("Worker [%d] has exited prematurely. Aborting benchmark.", worker_index)
self.send(self.start_sender, actor.BenchmarkFailure(f"Worker [{worker_index}] has exited prematurely."))
self.send(self.benchmark_actor, actor.BenchmarkFailure(f"Worker [{worker_index}] has exited prematurely."))
else:
self.logger.debug("A track preparator has exited.")

Expand All @@ -268,46 +268,44 @@ def receiveUnrecognizedMessage(self, msg, sender):

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_PrepareBenchmark(self, msg, sender):
self.start_sender = sender
self.coordinator = Driver(self, msg.config)
self.coordinator.prepare_benchmark(msg.track)
self.benchmark_actor = sender
self.driver = Driver(self, msg.config)
self.driver.prepare_benchmark(msg.track)

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_StartBenchmark(self, msg, sender):
self.start_sender = sender
self.coordinator.start_benchmark()
self.benchmark_actor = sender
self.driver.start_benchmark()
self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS))

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_TrackPrepared(self, msg, sender):
def receiveMsg_TrackPrepared(self, msg, track_preparation_actor):
self.transition_when_all_children_responded(
sender, msg, expected_status=None, new_status=None, transition=self._after_track_prepared
track_preparation_actor, msg, expected_status=None, new_status=None, transition=self._after_track_prepared
)

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_JoinPointReached(self, msg, sender):
self.coordinator.joinpoint_reached(msg.worker_id, msg.worker_timestamp, msg.task)
self.driver.joinpoint_reached(msg.worker_id, msg.worker_timestamp, msg.task)

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_UpdateSamples(self, msg, sender):
self.coordinator.update_samples(msg.samples)
self.driver.update_samples(msg.samples)

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == DriverActor.RESET_RELATIVE_TIME_MARKER:
self.coordinator.reset_relative_time()
elif not self.coordinator.finished():
self.driver.reset_relative_time()
elif not self.driver.finished():
self.post_process_timer += DriverActor.WAKEUP_INTERVAL_SECONDS
if self.post_process_timer >= DriverActor.POST_PROCESS_INTERVAL_SECONDS:
self.post_process_timer = 0
self.coordinator.post_process_samples()
self.coordinator.update_progress_message()
self.driver.post_process_samples()
self.driver.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS))

def create_client(self, host, cfg):
worker = self.createActor(Worker, targetActorRequirements=self._requirements(host))
self.send(worker, Bootstrap(cfg))
return worker
return self.createActor(Worker, targetActorRequirements=self._requirements(host))

def start_worker(self, driver, worker_id, cfg, track, allocations, client_contexts=None):
self.send(driver, StartWorker(worker_id, cfg, track, allocations, client_contexts))
Expand All @@ -322,8 +320,8 @@ def on_task_finished(self, metrics, next_task_scheduled_in):
if next_task_scheduled_in > 0:
self.wakeupAfter(datetime.timedelta(seconds=next_task_scheduled_in), payload=DriverActor.RESET_RELATIVE_TIME_MARKER)
else:
self.coordinator.reset_relative_time()
self.send(self.start_sender, TaskFinished(metrics, next_task_scheduled_in))
self.driver.reset_relative_time()
self.send(self.benchmark_actor, TaskFinished(metrics, next_task_scheduled_in))

def _requirements(self, host):
if host == "localhost":
Expand All @@ -340,9 +338,9 @@ def prepare_track(self, hosts, cfg, track):
self.send(child, msg)

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_ReadyForWork(self, msg, sender):
def receiveMsg_ReadyForWork(self, msg, task_preparation_actor):
msg = PrepareTrack(self.track)
self.send(sender, msg)
self.send(task_preparation_actor, msg)

def _create_track_preparator(self, host):
return self.createActor(TrackPreparationActor, targetActorRequirements=self._requirements(host))
Expand All @@ -359,7 +357,7 @@ def _after_track_prepared(self):
self.send(child, thespian.actors.ActorExitRequest())
self.children = []
self.send(
self.start_sender,
self.benchmark_actor,
PreparationComplete(
build_flavor,
build_version,
Expand All @@ -368,7 +366,7 @@ def _after_track_prepared(self):
)

def on_benchmark_complete(self, metrics):
self.send(self.start_sender, BenchmarkComplete(metrics))
self.send(self.benchmark_actor, BenchmarkComplete(metrics))


def load_local_config(coordinator_config):
Expand Down Expand Up @@ -399,33 +397,36 @@ def __init__(self):
self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
self.executor_future = None
self.wakeup_interval = 5
self.parent = None
self.task_preparation_actor = None
self.logger = logging.getLogger(__name__)
self.track_name = None
self.cfg = None

@actor.no_retry("task executor") # pylint: disable=no-value-for-parameter
def receiveMsg_StartTaskLoop(self, msg, sender):
self.parent = sender
self.task_preparation_actor = sender
self.track_name = msg.track_name
self.cfg = load_local_config(msg.cfg)
if self.cfg.opts("track", "test.mode.enabled"):
self.wakeup_interval = 0.5
track.load_track_plugins(self.cfg, self.track_name)
self.send(self.parent, ReadyForWork())
self.send(self.task_preparation_actor, ReadyForWork())

@actor.no_retry("task executor") # pylint: disable=no-value-for-parameter
def receiveMsg_DoTask(self, msg, sender):
# actor can arbitrarily execute code based on these messages. if anyone besides our parent sends a task, ignore
if sender != self.parent:
msg = f"TaskExecutionActor expected message from [{self.parent}] but the received the following from [{sender}]: {vars(msg)}"
if sender != self.task_preparation_actor:
msg = (
f"TaskExecutionActor expected message from [{self.task_preparation_actor}]"
" but the received the following from [{sender}]: {vars(msg)}"
)
raise exceptions.RallyError(msg)
task = msg.task
if self.executor_future is not None:
msg = f"TaskExecutionActor received DoTask message [{vars(msg)}], but was already busy"
raise exceptions.RallyError(msg)
if task is None:
self.send(self.parent, WorkerIdle())
self.send(self.task_preparation_actor, WorkerIdle())
else:
# this is a potentially long-running operation so we offload it a background thread so we don't block
# the actor (e.g. logging works properly as log messages are forwarded timely).
Expand All @@ -440,16 +441,16 @@ def receiveMsg_WakeupMessage(self, msg, sender):
self.logger.exception("Worker failed. Notifying parent...", exc_info=e)
# the exception might be user-defined and not be on the load path of the original sender. Hence, it
# cannot be deserialized on the receiver so we convert it here to a plain string.
self.send(self.parent, actor.BenchmarkFailure("Error in task executor", str(e)))
self.send(self.task_preparation_actor, actor.BenchmarkFailure("Error in task executor", str(e)))
else:
self.executor_future = None
self.send(self.parent, ReadyForWork())
self.send(self.task_preparation_actor, ReadyForWork())
else:
self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval))

def receiveMsg_BenchmarkFailure(self, msg, sender):
# sent by our no_retry infrastructure; forward to master
self.send(self.parent, msg)
self.send(self.task_preparation_actor, msg)


class TrackPreparationActor(actor.RallyActor):
Expand All @@ -461,7 +462,7 @@ class Status(Enum):
def __init__(self):
super().__init__()
self.processors = queue.Queue()
self.original_sender = None
self.driver_actor = None
self.logger.info("Track Preparator started")
self.status = self.Status.INITIALIZING
self.children = []
Expand All @@ -472,15 +473,16 @@ def __init__(self):

def receiveMsg_PoisonMessage(self, poisonmsg, sender):
self.logger.error("Track Preparator received a fatal indication from a load generator (%s). Shutting down.", poisonmsg.details)
self.send(self.original_sender, actor.BenchmarkFailure("Fatal track preparation indication", poisonmsg.details))
self.send(self.driver_actor, actor.BenchmarkFailure("Fatal track preparation indication", poisonmsg.details))

@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_Bootstrap(self, msg, sender):
self.driver_actor = sender
# load node-specific config to have correct paths available
self.cfg = load_local_config(msg.config)
# this instance of load_track occurs once per host, so install dependencies if necessary
load_track(self.cfg, install_dependencies=False)
self.send(sender, ReadyForWork())
self.send(self.driver_actor, ReadyForWork())

@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_ActorExitRequest(self, msg, sender):
Expand All @@ -491,11 +493,10 @@ def receiveMsg_ActorExitRequest(self, msg, sender):
@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_BenchmarkFailure(self, msg, sender):
# sent by our generic worker; forward to parent
self.send(self.original_sender, msg)
self.send(self.driver_actor, msg)

@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_PrepareTrack(self, msg, sender):
self.original_sender = sender
self.data_root_dir = self.cfg.opts("benchmarks", "local.dataset.cache")
tpr = TrackProcessorRegistry(self.cfg)
self.track = msg.track
Expand All @@ -521,7 +522,7 @@ def resume(self):
self, StartTaskLoop(self.track.name, self.cfg), self.Status.PROCESSOR_COMPLETE, self.Status.PROCESSOR_RUNNING
)
else:
self.send(self.original_sender, TrackPrepared())
self.send(self.driver_actor, TrackPrepared())

def _seed_tasks(self, processor):
self.tasks = list(WorkerTask(func, params) for func, params in processor.on_prepare_track(self.track, self.data_root_dir))
Expand All @@ -530,14 +531,14 @@ def _create_task_executor(self):
return self.createActor(TaskExecutionActor)

@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_ReadyForWork(self, msg, sender):
def receiveMsg_ReadyForWork(self, msg, task_execution_actor):
if self.tasks:
next_task = self.tasks.pop()
else:
next_task = None
new_msg = DoTask(next_task, self.cfg)
self.logger.debug("Track Preparator sending %s to %s", vars(new_msg), sender)
self.send(sender, new_msg)
self.logger.debug("Track Preparator sending %s to %s", vars(new_msg), task_execution_actor)
self.send(task_execution_actor, new_msg)

@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_WorkerIdle(self, msg, sender):
Expand Down Expand Up @@ -1209,11 +1210,6 @@ def __init__(self):
self.wakeup_interval = Worker.WAKEUP_INTERVAL_SECONDS
self.sample_queue_size = None

@actor.no_retry("worker") # pylint: disable=no-value-for-parameter
def receiveMsg_RallyConfig(self, msg, sender):
self.config = load_local_config(msg.config)
load_track(self.config)

@actor.no_retry("worker") # pylint: disable=no-value-for-parameter
def receiveMsg_StartWorker(self, msg, sender):
self.logger.info("Worker[%d] is about to start.", msg.worker_id)
Expand Down Expand Up @@ -1772,7 +1768,7 @@ def es_clients(client_id, all_hosts, all_client_options, distribution_version, d
runner.enable_assertions(self.assertions_enabled)

clients = []
aws = []
awaitables = []
# A parameter source should only be created once per task - it is partitioned later on per client.
params_per_task = {}
for client_id, task_allocation in self.task_allocations:
Expand All @@ -1793,12 +1789,12 @@ def es_clients(client_id, all_hosts, all_client_options, distribution_version, d
client_id, task, schedule, es, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error)
)
final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor
aws.append(final_executor())
awaitables.append(final_executor())
task_names = [t.task.task.name for t in self.task_allocations]
self.logger.info("Worker[%s] executing tasks: %s", self.parent_worker_id, task_names)
run_start = time.perf_counter()
try:
_ = await asyncio.gather(*aws)
_ = await asyncio.gather(*awaitables)
finally:
run_end = time.perf_counter()
self.logger.info(
Expand Down
2 changes: 1 addition & 1 deletion esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, name, description, target, stable=True):
:param name: A short name of the pipeline. This name will be used to reference it from the command line.
:param description: A human-readable description what the pipeline does.
:param target: A function that implements this pipeline
:param stable True iff the pipeline is considered production quality.
:param stable True if the pipeline is considered production quality.
"""
self.name = name
self.description = description
Expand Down

0 comments on commit 0720789

Please sign in to comment.