Skip to content

Commit

Permalink
Improve support for benchmarks with many clients (#1044)
Browse files Browse the repository at this point in the history
With this commit we emit some log messages only once per task (instead
of once per simulated client). These messages were completely repetitive
and also sent at almost the same time. As each log message is sent as an
actor system message to thespian's logging actor, this causes a storm of
messages to be sent which fills up its internal message queue and leads
to dropped messages.

We also increase the size of Rally's internal queue that stores
measurement samples from 2^16 to 2^20 and make this setting configurable
via Rally's configuration file. This allows Rally to buffer more
measurement samples if a lot of clients are configured for a benchmark.
  • Loading branch information
danielmitterdorfer authored Jul 31, 2020
1 parent 3abeff0 commit f126be8
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ def __init__(self):
self.sampler = None
self.start_driving = False
self.wakeup_interval = Worker.WAKEUP_INTERVAL_SECONDS
self.sample_queue_size = None

@actor.no_retry("worker")
def receiveMsg_StartWorker(self, msg, sender):
Expand All @@ -826,6 +827,7 @@ def receiveMsg_StartWorker(self, msg, sender):
self.worker_id = msg.worker_id
self.config = load_local_config(msg.config)
self.on_error = self.config.opts("driver", "on.error")
self.sample_queue_size = int(self.config.opts("system", "sample.queue.size", mandatory=False, default_value=1 << 20))
self.track = msg.track
track.set_absolute_data_path(self.config, self.track)
self.client_allocations = msg.client_allocations
Expand Down Expand Up @@ -935,8 +937,7 @@ def drive(self):
"tasks until next join point.", self.worker_id, self.current_task_index)
else:
self.logger.info("Worker[%d] is executing tasks at index [%d].", self.worker_id, self.current_task_index)
# allow to buffer more events than by default as we expect to have way more clients.
self.sampler = Sampler(start_timestamp=time.perf_counter(), buffer_size=65536)
self.sampler = Sampler(start_timestamp=time.perf_counter(), buffer_size=self.sample_queue_size)
executor = AsyncIoAdapter(self.config, self.track, task_allocations, self.sampler,
self.cancel, self.complete, self.on_error)

Expand Down Expand Up @@ -1625,15 +1626,19 @@ def schedule_for(task, client_index, parameter_source):
op = task.operation
num_clients = task.clients
sched = scheduler.scheduler_for(task.schedule, task.params)
logger.info("Choosing [%s] for [%s].", sched, task)
# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.info("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, num_clients)

if requires_time_period_schedule(task, runner_for_op, params_for_op):
warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
"seconds and a time period of [%s] seconds.", task.schedule, task.name,
str(warmup_time_period), str(task.time_period))
if client_index == 0:
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
"seconds and a time period of [%s] seconds.", task.schedule, task.name,
str(warmup_time_period), str(task.time_period))
loop_control = TimePeriodBased(warmup_time_period, task.time_period)
else:
warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0
Expand All @@ -1644,10 +1649,17 @@ def schedule_for(task, client_index, parameter_source):
iterations = 1
else:
iterations = None
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup "
"iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations))
if client_index == 0:
logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup "
"iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations))
loop_control = IterationBased(warmup_iterations, iterations)

if client_index == 0:
if loop_control.infinite:
logger.info("Parameter source will determine when the schedule for [%s] terminates.", task.name)
else:
logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)

return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)


Expand Down Expand Up @@ -1681,7 +1693,6 @@ def __init__(self, task_name, sched, task_progress_control, runner, params):
self.task_progress_control = task_progress_control
self.runner = runner
self.params = params
self.logger = logging.getLogger(__name__)
# TODO: Can we offload the parameter source execution to a different thread / process? Is this too heavy-weight?
#from concurrent.futures import ThreadPoolExecutor
#import asyncio
Expand All @@ -1691,7 +1702,6 @@ def __init__(self, task_name, sched, task_progress_control, runner, params):
async def __call__(self):
next_scheduled = 0
if self.task_progress_control.infinite:
self.logger.info("Parameter source will determine when the schedule for [%s] terminates.", self.task_name)
param_source_knows_progress = hasattr(self.params, "percent_completed")
self.task_progress_control.start()
while True:
Expand All @@ -1704,13 +1714,9 @@ async def __call__(self):
next_scheduled = self.sched.next(next_scheduled)
self.task_progress_control.next()
except StopIteration:
self.logger.info("%s schedule for [%s] stopped due to StopIteration.",
str(self.task_progress_control), self.task_name)
return
else:
self.task_progress_control.start()
self.logger.info("%s schedule will determine when the schedule for [%s] terminates.",
str(self.task_progress_control), self.task_name)
while not self.task_progress_control.completed:
try:
#current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
Expand All @@ -1722,10 +1728,7 @@ async def __call__(self):
next_scheduled = self.sched.next(next_scheduled)
self.task_progress_control.next()
except StopIteration:
self.logger.info("%s schedule for [%s] stopped due to StopIteration.",
str(self.task_progress_control), self.task_name)
return
self.logger.info("%s schedule for [%s] stopped regularly.", str(self.task_progress_control), self.task_name)


class TimePeriodBased:
Expand Down

0 comments on commit f126be8

Please sign in to comment.