Skip to content

Commit

Permalink
Allow to select a task as termination condition for parallel
Browse files Browse the repository at this point in the history
Closes #228
  • Loading branch information
danielmitterdorfer committed Jul 18, 2017
1 parent de02443 commit af6eec7
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 78 deletions.
29 changes: 28 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ All tasks in the ``schedule`` list are executed sequentially in the order in whi
* ``time-period`` (optional, no default value if not specified): Allows to define a default value for all tasks of the ``parallel`` element.
* ``warmup-iterations`` (optional, defaults to 0): Allows to define a default value for all tasks of the ``parallel`` element.
* ``iterations`` (optional, defaults to 1): Allows to define a default value for all tasks of the ``parallel`` element.
* ``tasks`` (mandatory): Defines a list of tasks that should be executed concurrently. Each task in the list can define the same properties as defined above.
* ``completed-by`` (optional): Allows to define the name of one task in the ``tasks`` list. As soon as this task has completed, the whole ``parallel`` task structure is considered completed. If this property is not explicitly defined, the ``parallel`` task structure is considered completed as soon as all its subtasks have completed. A task is completed if and only if all associated clients have completed execution.
* ``tasks`` (mandatory): Defines a list of tasks that should be executed concurrently. Each task in the list can define the following properties that have been defined above: ``clients``, ``warmup-time-period``, ``time-period``, ``warmup-iterations`` and ``iterations``.

.. note::

Expand Down Expand Up @@ -371,6 +372,32 @@ In this scenario, we run indexing and a few queries concurrently with a total of
}
]

We can use ``completed-by`` to stop querying as soon as bulk-indexing has completed::

"schedule": [
{
"parallel": {
"completed-by": "bulk",
"tasks": [
{
"operation": "bulk",
"warmup-time-period": 120,
"time-period": 3600,
"clients": 8,
"target-throughput": 50
},
{
"operation": "default",
"clients": 2,
"warmup-time-period": 480,
"time-period": 7200,
"target-throughput": 50
}
]
}
}
]

We can also mix sequential tasks with the ``parallel`` element. In this scenario we are indexing with 8 clients and continue querying with 6 clients after indexing has finished::

"schedule": [
Expand Down
235 changes: 172 additions & 63 deletions esrally/driver/driver.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions esrally/resources/track-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@
"minimum": 1,
"description": "Defines the time period in seconds to run the operation. Note that the parameter source may be exhausted before the specified time period has elapsed."
},
"completed-by": {
"type": "string",
"description": "The name of an operation in the 'tasks' block. When this operation is completed, the whole parallel element is considered to be completed."
},
"tasks": {
"type": "array",
"minItems": 1,
Expand Down
21 changes: 18 additions & 3 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,16 +626,28 @@ def parse_parallel(self, ops_spec, ops, challenge_name):
default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False)
default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False)
clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False)
completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False)

# now descent to each operation
tasks = []
for task in self._r(ops_spec, "tasks", error_ctx="parallel"):
tasks.append(self.parse_task(task, ops, challenge_name, default_warmup_iterations, default_iterations,
default_warmup_time_period, default_time_period))
default_warmup_time_period, default_time_period, completed_by))
if completed_by:
completion_task = None
for task in tasks:
if task.completes_parent and not completion_task:
completion_task = task
elif task.completes_parent:
self._error("'parallel' element for challenge '%s' contains multiple tasks with the name '%s' which are marked with "
"'completed-by' but only task is allowed to match." % (challenge_name, completed_by))
if not completion_task:
self._error("'parallel' element for challenge '%s' is marked with 'completed-by' with task name '%s' but no task with "
"this name exists." % (challenge_name, completed_by))
return track.Parallel(tasks, clients)

def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=0, default_iterations=1,
default_warmup_time_period=None, default_time_period=None):
default_warmup_time_period=None, default_time_period=None, completed_by_name=None):
op_name = task_spec["operation"]
if op_name not in ops:
self._error("'schedule' for challenge '%s' contains a non-existing operation '%s'. "
Expand All @@ -652,7 +664,10 @@ def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=0
time_period=self._r(task_spec, "time-period", error_ctx=op_name, mandatory=False,
default_value=default_time_period),
clients=self._r(task_spec, "clients", error_ctx=op_name, mandatory=False, default_value=1),
schedule=schedule, params=task_spec)
# this will work because op_name must always be set, i.e. it is never `None`.
completes_parent=(op_name == completed_by_name),
schedule=schedule,
params=task_spec)
if task.warmup_iterations != default_warmup_iterations and task.time_period is not None:
self._error("Operation '%s' in challenge '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not "
"mix time periods and iterations." % (op_name, challenge_name, task.warmup_iterations, task.time_period))
Expand Down
9 changes: 5 additions & 4 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,28 +353,29 @@ def __eq__(self, other):

class Task:
def __init__(self, operation, meta_data=None, warmup_iterations=0, iterations=1, warmup_time_period=None, time_period=None, clients=1,
schedule="deterministic", params=None):
completes_parent=False, schedule="deterministic", params=None):
self.operation = operation
self.meta_data = meta_data if meta_data else {}
self.warmup_iterations = warmup_iterations
self.iterations = iterations
self.warmup_time_period = warmup_time_period
self.time_period = time_period
self.clients = clients
self.completes_parent = completes_parent
self.schedule = schedule
self.params = params if params else {}

def __hash__(self):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ hash(self.warmup_time_period) ^ \
hash(self.time_period) ^ hash(self.clients) ^ hash(self.schedule)
hash(self.time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ hash(self.completes_parent)

def __eq__(self, other):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return isinstance(other, type(self)) and (self.operation, self.warmup_iterations, self.iterations, self.warmup_time_period,
self.time_period, self.clients, self.schedule) == \
self.time_period, self.clients, self.schedule, self.completes_parent) == \
(other.operation, other.warmup_iterations, other.iterations, other.warmup_time_period,
other.time_period, other.clients, other.schedule)
other.time_period, other.clients, other.schedule, other.completes_parent)

def __iter__(self):
return iter([self])
Expand Down
90 changes: 83 additions & 7 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@ def test_allocates_two_parallel_tasks(self):
self.assertEqual(3, len(allocator.allocations[1]))
self.assertEqual(2, len(allocator.join_points))
self.assertEqual([{op}], allocator.operations_per_joinpoint)
for join_point in allocator.join_points:
self.assertFalse(join_point.preceding_task_completes_parent)
self.assertEqual(0, join_point.num_clients_executing_completing_task)

def test_a_task_completes_the_parallel_structure(self):
opA = track.Operation("index-completing", track.OperationType.Index, param_source="driver-test-param-source")
opB = track.Operation("index-non-completing", track.OperationType.Index, param_source="driver-test-param-source")
taskA = track.Task(opA, completes_parent=True)
taskB = track.Task(opB)

allocator = driver.Allocator([track.Parallel([taskA, taskB])])

self.assertEqual(2, allocator.clients)
self.assertEqual(3, len(allocator.allocations[0]))
self.assertEqual(3, len(allocator.allocations[1]))
self.assertEqual(2, len(allocator.join_points))
self.assertEqual([{opA, opB}], allocator.operations_per_joinpoint)
final_join_point = allocator.join_points[1]
self.assertTrue(final_join_point.preceding_task_completes_parent)
self.assertEqual(1, final_join_point.num_clients_executing_completing_task)
self.assertEqual([0], final_join_point.clients_executing_completing_task)

def test_allocates_mixed_tasks(self):
op1 = track.Operation("index", track.OperationType.Index, param_source="driver-test-param-source")
Expand All @@ -101,6 +122,9 @@ def test_allocates_mixed_tasks(self):
self.assertEqual(11, len(allocator.allocations[2]))
self.assertEqual(6, len(allocator.join_points))
self.assertEqual([{op1}, {op1, op2}, {op1}, {op1}, {op3}], allocator.operations_per_joinpoint)
for join_point in allocator.join_points:
self.assertFalse(join_point.preceding_task_completes_parent)
self.assertEqual(0, join_point.num_clients_executing_completing_task)

def test_allocates_more_tasks_than_clients(self):
op1 = track.Operation("index-a", track.OperationType.Index, param_source="driver-test-param-source")
Expand All @@ -110,7 +134,7 @@ def test_allocates_more_tasks_than_clients(self):
op5 = track.Operation("index-e", track.OperationType.Index, param_source="driver-test-param-source")

index_a = track.Task(op1)
index_b = track.Task(op2)
index_b = track.Task(op2, completes_parent=True)
index_c = track.Task(op3)
index_d = track.Task(op4)
index_e = track.Task(op5)
Expand All @@ -121,6 +145,7 @@ def test_allocates_more_tasks_than_clients(self):

allocations = allocator.allocations

# 2 clients
self.assertEqual(2, len(allocations))
# join_point, index_a, index_c, index_e, join_point
self.assertEqual(5, len(allocations[0]))
Expand All @@ -131,6 +156,11 @@ def test_allocates_more_tasks_than_clients(self):
self.assertEqual([allocations[1][0], index_b, index_d, None, allocations[1][4]], allocations[1])

self.assertEqual([{op1, op2, op3, op4, op5}], allocator.operations_per_joinpoint)
self.assertEqual(2, len(allocator.join_points))
final_join_point = allocator.join_points[1]
self.assertTrue(final_join_point.preceding_task_completes_parent)
self.assertEqual(1, final_join_point.num_clients_executing_completing_task)
self.assertEqual([1], final_join_point.clients_executing_completing_task)

def test_considers_number_of_clients_per_subtask(self):
op1 = track.Operation("index-a", track.OperationType.Index, param_source="driver-test-param-source")
Expand All @@ -139,14 +169,15 @@ def test_considers_number_of_clients_per_subtask(self):

index_a = track.Task(op1)
index_b = track.Task(op2)
index_c = track.Task(op3, clients=2)
index_c = track.Task(op3, clients=2, completes_parent=True)

allocator = driver.Allocator([track.Parallel(tasks=[index_a, index_b, index_c], clients=3)])

self.assertEqual(3, allocator.clients)

allocations = allocator.allocations

# 3 clients
self.assertEqual(3, len(allocations))
# join_point, index_a, index_c, join_point
self.assertEqual(4, len(allocations[0]))
Expand All @@ -161,6 +192,13 @@ def test_considers_number_of_clients_per_subtask(self):

self.assertEqual([{op1, op2, op3}], allocator.operations_per_joinpoint)

self.assertEqual(2, len(allocator.join_points))
final_join_point = allocator.join_points[1]
self.assertTrue(final_join_point.preceding_task_completes_parent)
# task index_c has two clients, hence we have to wait for two clients to finish
self.assertEqual(2, final_join_point.num_clients_executing_completing_task)
self.assertEqual([2, 0], final_join_point.clients_executing_completing_task)


class IndexManagementTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
Expand Down Expand Up @@ -400,11 +438,15 @@ def test_execute_schedule_in_throughput_mode(self, es):

sampler = driver.Sampler(client_id=2, task=task, start_timestamp=100)
cancel = threading.Event()
driver.execute_schedule(cancel, 0, task.operation, schedule, es, sampler)
complete = threading.Event()

execute_schedule = driver.Executor(task, schedule, es, sampler, cancel, complete)
execute_schedule()

samples = sampler.samples

self.assertTrue(len(samples) > 0)
self.assertFalse(complete.is_set(), "Executor should not auto-complete a normal task")
previous_absolute_time = -1.0
previous_relative_time = -1.0
for sample in samples:
Expand Down Expand Up @@ -443,12 +485,16 @@ def test_execute_schedule_throughput_throttled(self, es):
},
param_source="driver-test-param-source"),
warmup_time_period=0.5, time_period=0.5, clients=4,
params={"target-throughput": target_throughput, "clients": 4})
params={"target-throughput": target_throughput, "clients": 4},
completes_parent=True)
schedule = driver.schedule_for(test_track, task, 0)
sampler = driver.Sampler(client_id=0, task=task, start_timestamp=0)

cancel = threading.Event()
driver.execute_schedule(cancel, 0, task.operation, schedule, es, sampler)
complete = threading.Event()

execute_schedule = driver.Executor(task, schedule, es, sampler, cancel, complete)
execute_schedule()

samples = sampler.samples

Expand All @@ -457,6 +503,7 @@ def test_execute_schedule_throughput_throttled(self, es):
upper_bound = bounds[1]
self.assertTrue(lower_bound <= sample_size <= upper_bound,
msg="Expected sample size to be between %d and %d but was %d" % (lower_bound, upper_bound, sample_size))
self.assertTrue(complete.is_set(), "Executor should auto-complete a task that terminates its parent")

@mock.patch("elasticsearch.Elasticsearch")
def test_cancel_execute_schedule(self, es):
Expand Down Expand Up @@ -484,8 +531,11 @@ def test_cancel_execute_schedule(self, es):
sampler = driver.Sampler(client_id=0, task=task, start_timestamp=0)

cancel = threading.Event()
complete = threading.Event()
execute_schedule = driver.Executor(task, schedule, es, sampler, cancel, complete)

cancel.set()
driver.execute_schedule(cancel, 0, task.operation, schedule, es, sampler)
execute_schedule()

samples = sampler.samples

Expand All @@ -500,11 +550,19 @@ class ExpectedUnitTestException(Exception):
def run(*args, **kwargs):
raise ExpectedUnitTestException()

task = track.Task(track.Operation("no-op", track.OperationType.Index.name, params={},
param_source="driver-test-param-source"),
warmup_time_period=0.5, time_period=0.5, clients=4,
params={"clients": 4})

schedule = [(0, metrics.SampleType.Warmup, 0, self.context_managed(run), None)]
sampler = driver.Sampler(client_id=0, task=None, start_timestamp=0)
cancel = threading.Event()
complete = threading.Event()
execute_schedule = driver.Executor(task, schedule, es, sampler, cancel, complete)

with self.assertRaises(ExpectedUnitTestException):
driver.execute_schedule(cancel, 0, "operation_name", schedule, es, sampler=sampler)
execute_schedule()

es.assert_not_called()

Expand Down Expand Up @@ -607,6 +665,24 @@ def __str__(self):
ctx.exception.args[0])


class ProfilerTests(TestCase):
def test_profiler_is_a_transparent_wrapper(self):
import time

def f(x):
time.sleep(x)
return x * 2

profiler = driver.Profiler(f, 0, "sleep-operation")
start = time.perf_counter()
# this should take roughly 1 second and should return something
return_value = profiler(1)
end = time.perf_counter()
self.assertEqual(2, return_value)
duration = end - start
self.assertTrue(0.9 <= duration <= 1.2, "Should sleep for roughly 1 second but took [%.2f] seconds." % duration)


class ClusterHealthCheckTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_waits_for_expected_cluster_status(self, es):
Expand Down
Loading

0 comments on commit af6eec7

Please sign in to comment.