Skip to content

Commit

Permalink
Run tasks indefinitely
Browse files Browse the repository at this point in the history
With this commit we allow to run tasks indefinitely. This is only useful
if used in a parallel element and having another task specified in the
"completed-by" property.

Closes #318
  • Loading branch information
danielmitterdorfer committed Sep 12, 2017
1 parent d0742f4 commit d7ae468
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
37 changes: 28 additions & 9 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,14 @@ def update_progress_message(self, task_finished=False):
if task_finished:
total_progress = 1.0
else:
num_clients = max(len(self.most_recent_sample_per_client), 1)
total_progress = sum([s.percent_completed for s in self.most_recent_sample_per_client.values()]) / num_clients
# we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel
# structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the
# task that is completing the parallel structure.
progress_per_client = [s.percent_completed
for s in self.most_recent_sample_per_client.values() if s.percent_completed is not None]

num_clients = max(len(progress_per_client), 1)
total_progress = sum(progress_per_client) / num_clients
self.progress_reporter.print("Running %s" % ops, "[%3d%% done]" % (round(total_progress * 100)))
if task_finished:
self.progress_reporter.finish()
Expand Down Expand Up @@ -621,8 +627,12 @@ def receiveMessage(self, msg, sender):
else:
if current_samples and len(current_samples) > 0:
most_recent_sample = current_samples[-1]
logger.info("LoadGenerator[%s] is executing [%s] (%.2f%% complete)." %
(str(self.client_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0))
if most_recent_sample.percent_completed is not None:
logger.info("LoadGenerator[%s] is executing [%s] (%.2f%% complete)." %
(str(self.client_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0))
else:
logger.info("LoadGenerator[%s] is executing [%s] (dependent eternal task)." %
(str(self.client_id), most_recent_sample.task))
else:
logger.info("LoadGenerator[%s] is executing (no samples)." % (str(self.client_id)))
self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval))
Expand Down Expand Up @@ -736,6 +746,7 @@ def __init__(self, client_id, absolute_time, relative_time, task, sample_type, r
self.total_ops = total_ops
self.total_ops_unit = total_ops_unit
self.time_period = time_period
# may be None for eternal tasks!
self.percent_completed = percent_completed

@property
Expand Down Expand Up @@ -1255,11 +1266,19 @@ def time_period_based(sched, warmup_time_period, time_period, runner, params):
start = time.perf_counter()
if time_period is None:
iterations = params.size()
for it in range(0, iterations):
sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal
percent_completed = (it + 1) / iterations
yield (next_scheduled, sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
if iterations:
for it in range(0, iterations):
sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal
percent_completed = (it + 1) / iterations
yield (next_scheduled, sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
else:
while True:
sample_type = metrics.SampleType.Warmup if time.perf_counter() - start < warmup_time_period else metrics.SampleType.Normal
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = None
yield (next_scheduled, sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
else:
end = start + warmup_time_period + time_period
it = 0
Expand Down
4 changes: 2 additions & 2 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ def size(self):
In the former case, return just 1. In the latter case, you should determine the number of times that `#params()` will be invoked.
With that number, Rally can show the progress made so far to the user.
:return: The "size" of this parameter source.
:return: The "size" of this parameter source or ``None`` if should run eternally.
"""
return 1
return None

def params(self):
"""
Expand Down
32 changes: 26 additions & 6 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def partition(self, partition_index, total_partitions):
return self

def size(self):
return self._params["size"] if "size" in self._params else 1
return self._params["size"] if "size" in self._params else None

def params(self):
return self._params
Expand Down Expand Up @@ -177,16 +177,19 @@ def test_client_reaches_join_point_which_completes_parent(self, wait_for_status,


class ScheduleTestCase(TestCase):
def assert_schedule(self, expected_schedule, schedule):
def assert_schedule(self, expected_schedule, schedule, eternal_schedule=False):
idx = 0
for invocation_time, sample_type, progress_percent, runner, params in schedule:
exp_invocation_time, exp_sample_type, exp_progress_percent, exp_params = expected_schedule[idx]
self.assertAlmostEqual(exp_invocation_time, invocation_time, msg="Expected invocation time does not match")
self.assertEqual(exp_sample_type, sample_type, "Sample type does not match")
self.assertEqual(exp_progress_percent, progress_percent, "Current progress does not match")
self.assertAlmostEqual(exp_invocation_time, invocation_time, msg="Invocation time for sample at index %d does not match" % idx)
self.assertEqual(exp_sample_type, sample_type, "Sample type for sample at index %d does not match" % idx)
self.assertEqual(exp_progress_percent, progress_percent, "Current progress for sample at index %d does not match" % idx)
self.assertIsNotNone(runner, "runner must be defined")
self.assertEqual(exp_params, params, "Parameters do not match")
idx += 1
# for eternal schedules we only check the first few elements
if eternal_schedule and idx == len(expected_schedule):
break


class AllocatorTests(TestCase):
Expand Down Expand Up @@ -520,6 +523,21 @@ def test_schedule_for_warmup_time_based(self):
(10.0, metrics.SampleType.Normal, 11 / 11, {"body": ["a"], "size": 11}),
], list(invocations))

def test_eternal_schedule(self):
task = track.Task(track.Operation("time-based", track.OperationType.Index.name, params={"body": ["a"]},
param_source="driver-test-param-source"),
warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)

self.assert_schedule([
(0.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(1.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(2.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(3.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(4.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
], invocations, eternal_schedule=True)

def test_schedule_for_time_based(self):
task = track.Task(track.Operation("time-based", track.OperationType.Index.name, params={"body": ["a"], "size": 11},
param_source="driver-test-param-source"), warmup_time_period=0.1, time_period=0.1, clients=1)
Expand Down Expand Up @@ -580,7 +598,9 @@ def test_execute_schedule_in_throughput_mode(self, es):
task = track.Task(track.Operation("time-based", track.OperationType.Index.name, params={
"body": ["action_metadata_line", "index_line"],
"action_metadata_present": True,
"bulk-size": 1
"bulk-size": 1,
# we need this because DriverTestParamSource does not know that we only have one bulk and hence size() returns incorrect results
"size": 1
},
param_source="driver-test-param-source"),
warmup_time_period=0, clients=4)
Expand Down

0 comments on commit d7ae468

Please sign in to comment.