Skip to content

Commit

Permalink
Call the correct functions for scroll-search and paginated-search
Browse files Browse the repository at this point in the history
… operations (#1368)

The `Query` runner services three operation types: `search`,
`paginated-search`, and `scroll-search`. Its `__call__` method dispatches to
the appropriate function for one of these operations based on the
`operation-type` passed in via the `params` dict that it accepts as an argument.

The `operation-type` actually never got passed to this method, however. This
resulted in all three operation types ultimately calling the same function
(`request_body_query`), which was not intentional.

To fix this, we now inject the `operation-type` into the dict returned by the
relevant parameter source when the runner is invoked. We also register
`SearchParamSource` for both `scroll-search` and `paginated-search` operations.
  • Loading branch information
michaelbaamonde authored Nov 4, 2021
1 parent ba600e3 commit 7460e31
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ Properties
* ``body`` (mandatory): The query body.
* ``response-compression-enabled`` (optional, defaults to ``true``): Allows to disable HTTP compression of responses. As these responses are sometimes large and decompression may be a bottleneck on the client, it is possible to turn off response compression.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data about queries. As it analyzes the corresponding response in more detail, this might incur additional overhead which can skew measurement results. This flag is ineffective for scroll queries.
* ``pages`` (optional): Number of pages to retrieve. If this parameter is present, a scroll query will be executed. If you want to retrieve all result pages, use the value "all". See also the ``scroll-search`` operation type.
* ``pages`` (optional, deprecated): Number of pages to retrieve. If this parameter is present, a scroll query will be executed. If you want to retrieve all result pages, use the value "all". This parameter is deprecated and will be replaced with the ``scroll-search`` operation in a future release.
* ``results-per-page`` (optional): Number of documents to retrieve per page. This maps to the Search API's ``size`` parameter, and can be used for scroll and non-scroll searches. Defaults to ``10``

Example::
Expand Down
16 changes: 14 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2213,6 +2213,7 @@ def __init__(self, task_allocation, sched, task_progress_control, runner, params
:return: A generator for the corresponding parameters.
"""
self.task_allocation = task_allocation
self.operation_type = task_allocation.task.operation.type
self.sched = sched
self.task_progress_control = task_progress_control
self.runner = runner
Expand Down Expand Up @@ -2243,6 +2244,11 @@ def before_request(self, now):
def after_request(self, now, weight, unit, request_meta_data):
self.sched.after_request(now, weight, unit, request_meta_data)

def params_with_operation_type(self):
p = self.params.params()
p.update({"operation-type": self.operation_type})
return p

async def __call__(self):
next_scheduled = 0
if self.task_progress_control.infinite:
Expand All @@ -2253,7 +2259,13 @@ async def __call__(self):
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = self.params.percent_completed if param_source_knows_progress else None
# current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner, self.params.params())
yield (
next_scheduled,
self.task_progress_control.sample_type,
percent_completed,
self.runner,
self.params_with_operation_type(),
)
self.task_progress_control.next()
except StopIteration:
return
Expand All @@ -2267,7 +2279,7 @@ async def __call__(self):
self.task_progress_control.sample_type,
self.task_progress_control.percent_completed,
self.runner,
self.params.params(),
self.params_with_operation_type(),
)
self.task_progress_control.next()
except StopIteration:
Expand Down
26 changes: 15 additions & 11 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def runner_for(operation_type):
try:
return __RUNNERS[operation_type]
except KeyError:
raise exceptions.RallyError("No runner available for operation type [%s]" % operation_type)
raise exceptions.RallyError(f"No runner available for operation-type: [{operation_type}]")


def enable_assertions(enabled):
Expand Down Expand Up @@ -766,6 +766,7 @@ class Query(Runner):
It expects at least the following keys in the `params` hash:
* `operation-type`: One of `search`, `paginated-search`, or `scroll-search`.
* `index`: The index or indices against which to issue the query.
* `type`: See `index`
* `cache`: True iff the request cache should be used.
Expand Down Expand Up @@ -816,6 +817,7 @@ async def __call__(self, es, params):
# by the composite's parameter source.
index = mandatory(params, "index", self)
body = mandatory(params, "body", self)
operation_type = params.get("operation-type")
size = params.get("results-per-page")
if size:
body["size"] = size
Expand Down Expand Up @@ -970,19 +972,21 @@ async def _scroll_query(es, params):
"took": took,
}

search_method = params.get("operation-type")
if search_method == "paginated-search":
if operation_type == "paginated-search":
return await _search_after_query(es, params)
elif search_method == "scroll-search":
return await _scroll_query(es, params)
elif "pages" in params:
logging.getLogger(__name__).warning(
"Invoking a scroll search with the 'search' operation is deprecated "
"and will be removed in a future release. Use 'scroll-search' instead."
)
elif operation_type == "scroll-search":
return await _scroll_query(es, params)
elif operation_type == "search":
if "pages" in params:
logging.getLogger(__name__).warning(
"Invoking a scroll search with the 'search' operation is deprecated "
"and will be removed in a future release. Use 'scroll-search' instead."
)
return await _scroll_query(es, params)
else:
return await _request_body_query(es, params)
else:
return await _request_body_query(es, params)
raise exceptions.RallyError(f"No runner available for operation-type: [{operation_type}]")

async def _raw_search(self, es, doc_type, index, body, params, headers=None):
components = []
Expand Down
2 changes: 2 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,8 @@ def read_bulk(self):

register_param_source_for_operation(track.OperationType.Bulk, BulkIndexParamSource)
register_param_source_for_operation(track.OperationType.Search, SearchParamSource)
register_param_source_for_operation(track.OperationType.ScrollSearch, SearchParamSource)
register_param_source_for_operation(track.OperationType.PaginatedSearch, SearchParamSource)
register_param_source_for_operation(track.OperationType.CreateIndex, CreateIndexParamSource)
register_param_source_for_operation(track.OperationType.DeleteIndex, DeleteIndexParamSource)
register_param_source_for_operation(track.OperationType.CreateDataStream, CreateDataStreamParamSource)
Expand Down
102 changes: 51 additions & 51 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,14 +958,14 @@ async def test_search_task_one_client(self):
schedule = driver.schedule_for(task_allocation, param_source)

expected_schedule = [
(0, metrics.SampleType.Warmup, 1 / 8, {}),
(0.1, metrics.SampleType.Warmup, 2 / 8, {}),
(0.2, metrics.SampleType.Warmup, 3 / 8, {}),
(0.3, metrics.SampleType.Normal, 4 / 8, {}),
(0.4, metrics.SampleType.Normal, 5 / 8, {}),
(0.5, metrics.SampleType.Normal, 6 / 8, {}),
(0.6, metrics.SampleType.Normal, 7 / 8, {}),
(0.7, metrics.SampleType.Normal, 8 / 8, {}),
(0, metrics.SampleType.Warmup, 1 / 8, {"operation-type": "search"}),
(0.1, metrics.SampleType.Warmup, 2 / 8, {"operation-type": "search"}),
(0.2, metrics.SampleType.Warmup, 3 / 8, {"operation-type": "search"}),
(0.3, metrics.SampleType.Normal, 4 / 8, {"operation-type": "search"}),
(0.4, metrics.SampleType.Normal, 5 / 8, {"operation-type": "search"}),
(0.5, metrics.SampleType.Normal, 6 / 8, {"operation-type": "search"}),
(0.6, metrics.SampleType.Normal, 7 / 8, {"operation-type": "search"}),
(0.7, metrics.SampleType.Normal, 8 / 8, {"operation-type": "search"}),
]
await self.assert_schedule(expected_schedule, schedule)

Expand All @@ -984,12 +984,12 @@ async def test_search_task_two_clients(self):
schedule = driver.schedule_for(task_allocation, param_source)

expected_schedule = [
(0, metrics.SampleType.Warmup, 1 / 6, {}),
(0.2, metrics.SampleType.Normal, 2 / 6, {}),
(0.4, metrics.SampleType.Normal, 3 / 6, {}),
(0.6, metrics.SampleType.Normal, 4 / 6, {}),
(0.8, metrics.SampleType.Normal, 5 / 6, {}),
(1.0, metrics.SampleType.Normal, 6 / 6, {}),
(0, metrics.SampleType.Warmup, 1 / 6, {"operation-type": "search"}),
(0.2, metrics.SampleType.Normal, 2 / 6, {"operation-type": "search"}),
(0.4, metrics.SampleType.Normal, 3 / 6, {"operation-type": "search"}),
(0.6, metrics.SampleType.Normal, 4 / 6, {"operation-type": "search"}),
(0.8, metrics.SampleType.Normal, 5 / 6, {"operation-type": "search"}),
(1.0, metrics.SampleType.Normal, 6 / 6, {"operation-type": "search"}),
]
await self.assert_schedule(expected_schedule, schedule)

Expand All @@ -1014,9 +1014,9 @@ async def test_schedule_param_source_determines_iterations_no_warmup(self):

await self.assert_schedule(
[
(0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "size": 3}),
(1.0, metrics.SampleType.Normal, 2 / 3, {"body": ["a"], "size": 3}),
(2.0, metrics.SampleType.Normal, 3 / 3, {"body": ["a"], "size": 3}),
(0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "operation-type": "bulk", "size": 3}),
(1.0, metrics.SampleType.Normal, 2 / 3, {"body": ["a"], "operation-type": "bulk", "size": 3}),
(2.0, metrics.SampleType.Normal, 3 / 3, {"body": ["a"], "operation-type": "bulk", "size": 3}),
],
schedule,
)
Expand All @@ -1042,11 +1042,11 @@ async def test_schedule_param_source_determines_iterations_including_warmup(self

await self.assert_schedule(
[
(0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "size": 5}),
(1.0, metrics.SampleType.Warmup, 2 / 5, {"body": ["a"], "size": 5}),
(2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}),
(3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}),
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}),
(0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(1.0, metrics.SampleType.Warmup, 2 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
],
schedule,
)
Expand All @@ -1072,7 +1072,7 @@ async def test_schedule_defaults_to_iteration_based(self):

await self.assert_schedule(
[
(0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"]}),
(0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"], "operation-type": "bulk"}),
],
schedule,
)
Expand All @@ -1098,17 +1098,17 @@ async def test_schedule_for_warmup_time_based(self):

await self.assert_schedule(
[
(0.0, metrics.SampleType.Normal, 1 / 11, {"body": ["a"], "size": 11}),
(1.0, metrics.SampleType.Normal, 2 / 11, {"body": ["a"], "size": 11}),
(2.0, metrics.SampleType.Normal, 3 / 11, {"body": ["a"], "size": 11}),
(3.0, metrics.SampleType.Normal, 4 / 11, {"body": ["a"], "size": 11}),
(4.0, metrics.SampleType.Normal, 5 / 11, {"body": ["a"], "size": 11}),
(5.0, metrics.SampleType.Normal, 6 / 11, {"body": ["a"], "size": 11}),
(6.0, metrics.SampleType.Normal, 7 / 11, {"body": ["a"], "size": 11}),
(7.0, metrics.SampleType.Normal, 8 / 11, {"body": ["a"], "size": 11}),
(8.0, metrics.SampleType.Normal, 9 / 11, {"body": ["a"], "size": 11}),
(9.0, metrics.SampleType.Normal, 10 / 11, {"body": ["a"], "size": 11}),
(10.0, metrics.SampleType.Normal, 11 / 11, {"body": ["a"], "size": 11}),
(0.0, metrics.SampleType.Normal, 1 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(1.0, metrics.SampleType.Normal, 2 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(2.0, metrics.SampleType.Normal, 3 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(3.0, metrics.SampleType.Normal, 4 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(4.0, metrics.SampleType.Normal, 5 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(5.0, metrics.SampleType.Normal, 6 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(6.0, metrics.SampleType.Normal, 7 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(7.0, metrics.SampleType.Normal, 8 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(8.0, metrics.SampleType.Normal, 9 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(9.0, metrics.SampleType.Normal, 10 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
(10.0, metrics.SampleType.Normal, 11 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}),
],
schedule,
)
Expand All @@ -1134,11 +1134,11 @@ async def test_infinite_schedule_without_progress_indication(self):

await self.assert_schedule(
[
(0.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(1.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(2.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(3.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(4.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(0.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}),
(1.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}),
(2.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}),
(3.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}),
(4.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}),
],
schedule,
infinite_schedule=True,
Expand All @@ -1165,11 +1165,11 @@ async def test_finite_schedule_with_progress_indication(self):

await self.assert_schedule(
[
(0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "size": 5}),
(1.0, metrics.SampleType.Normal, 2 / 5, {"body": ["a"], "size": 5}),
(2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}),
(3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}),
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}),
(0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(1.0, metrics.SampleType.Normal, 2 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}),
],
schedule,
infinite_schedule=False,
Expand All @@ -1192,11 +1192,11 @@ async def test_schedule_with_progress_determined_by_runner(self):

await self.assert_schedule(
[
(0.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(1.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(2.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(3.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(4.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(0.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}),
(1.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}),
(2.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}),
(3.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}),
(4.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}),
],
schedule,
infinite_schedule=True,
Expand Down Expand Up @@ -1240,7 +1240,7 @@ async def test_schedule_for_time_based(self):
self.assertTrue(round(progress_percent, 2) >= 0.0, "progress should be >= 0.0 but was [%f]" % progress_percent)
self.assertTrue(round(progress_percent, 2) <= 1.0, "progress should be <= 1.0 but was [%f]" % progress_percent)
self.assertIsNotNone(runner, "runner must be defined")
self.assertEqual({"body": ["a"], "size": 11}, params)
self.assertEqual({"body": ["a"], "operation-type": "bulk", "size": 11}, params)

@run_async
async def test_schedule_for_time_based_with_multiple_clients(self):
Expand Down Expand Up @@ -1287,7 +1287,7 @@ async def test_schedule_for_time_based_with_multiple_clients(self):
self.assertTrue(round(progress_percent, 2) >= 0.0, "progress should be >= 0.0 but was [%f]" % progress_percent)
self.assertTrue(round(progress_percent, 2) <= 1.0, "progress should be <= 1.0 but was [%f]" % progress_percent)
self.assertIsNotNone(runner, "runner must be defined")
self.assertEqual({"body": ["a"], "size": 11}, params)
self.assertEqual({"body": ["a"], "operation-type": "bulk", "size": 11}, params)


class AsyncExecutorTests(TestCase):
Expand Down
Loading

0 comments on commit 7460e31

Please sign in to comment.