From 7460e31f79c3a9af80f554bef6cbe238b11b98fb Mon Sep 17 00:00:00 2001 From: Mike Baamonde Date: Thu, 4 Nov 2021 09:18:45 -0400 Subject: [PATCH] Call the correct functions for `scroll-search` and `paginated-search` operations (#1368) The `Query` runner services three operation types: `search`, `paginated-search`, and `scroll-search`. Its `__call__` method dispatches to the appropriate function for one of these operations based on the `operation-type` passed in via the `params` dict that it accepts as an argument. The `operation-type` actually never got passed to this method, however. This resulted in all three operation types ultimately calling the same function (`request_body_query`), which was not intentional. To fix this, we now inject the `operation-type` into the dict returned by the relevant parameter source when the runner is invoked. We also register `SearchParamSource` for both `scroll-search` and `paginated-search` operations. --- docs/track.rst | 2 +- esrally/driver/driver.py | 16 +++++- esrally/driver/runner.py | 26 +++++---- esrally/track/params.py | 2 + tests/driver/driver_test.py | 102 ++++++++++++++++++------------------ tests/driver/runner_test.py | 90 +++++++++++++++++++++++++++++++ tests/track/params_test.py | 2 +- 7 files changed, 174 insertions(+), 66 deletions(-) diff --git a/docs/track.rst b/docs/track.rst index 76c16e047..89e7a961d 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -968,7 +968,7 @@ Properties * ``body`` (mandatory): The query body. * ``response-compression-enabled`` (optional, defaults to ``true``): Allows to disable HTTP compression of responses. As these responses are sometimes large and decompression may be a bottleneck on the client, it is possible to turn off response compression. * ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data about queries. As it analyzes the corresponding response in more detail, this might incur additional overhead which can skew measurement results. This flag is ineffective for scroll queries. -* ``pages`` (optional): Number of pages to retrieve. If this parameter is present, a scroll query will be executed. If you want to retrieve all result pages, use the value "all". See also the ``scroll-search`` operation type. +* ``pages`` (optional, deprecated): Number of pages to retrieve. If this parameter is present, a scroll query will be executed. If you want to retrieve all result pages, use the value "all". This parameter is deprecated and will be replaced with the ``scroll-search`` operation in a future release. * ``results-per-page`` (optional): Number of documents to retrieve per page. This maps to the Search API's ``size`` parameter, and can be used for scroll and non-scroll searches. Defaults to ``10`` Example:: diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 0b7d68615..771ebcbda 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -2213,6 +2213,7 @@ def __init__(self, task_allocation, sched, task_progress_control, runner, params :return: A generator for the corresponding parameters. """ self.task_allocation = task_allocation + self.operation_type = task_allocation.task.operation.type self.sched = sched self.task_progress_control = task_progress_control self.runner = runner @@ -2243,6 +2244,11 @@ def before_request(self, now): def after_request(self, now, weight, unit, request_meta_data): self.sched.after_request(now, weight, unit, request_meta_data) + def params_with_operation_type(self): + p = self.params.params() + p.update({"operation-type": self.operation_type}) + return p + async def __call__(self): next_scheduled = 0 if self.task_progress_control.infinite: @@ -2253,7 +2259,13 @@ async def __call__(self): # does not contribute at all to completion. Hence, we cannot define completion. percent_completed = self.params.percent_completed if param_source_knows_progress else None # current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) - yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner, self.params.params()) + yield ( + next_scheduled, + self.task_progress_control.sample_type, + percent_completed, + self.runner, + self.params_with_operation_type(), + ) self.task_progress_control.next() except StopIteration: return @@ -2267,7 +2279,7 @@ async def __call__(self): self.task_progress_control.sample_type, self.task_progress_control.percent_completed, self.runner, - self.params.params(), + self.params_with_operation_type(), ) self.task_progress_control.next() except StopIteration: diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index ade81276b..9228b182c 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -103,7 +103,7 @@ def runner_for(operation_type): try: return __RUNNERS[operation_type] except KeyError: - raise exceptions.RallyError("No runner available for operation type [%s]" % operation_type) + raise exceptions.RallyError(f"No runner available for operation-type: [{operation_type}]") def enable_assertions(enabled): @@ -766,6 +766,7 @@ class Query(Runner): It expects at least the following keys in the `params` hash: + * `operation-type`: One of `search`, `paginated-search`, or `scroll-search`. * `index`: The index or indices against which to issue the query. * `type`: See `index` * `cache`: True iff the request cache should be used. @@ -816,6 +817,7 @@ async def __call__(self, es, params): # by the composite's parameter source. index = mandatory(params, "index", self) body = mandatory(params, "body", self) + operation_type = params.get("operation-type") size = params.get("results-per-page") if size: body["size"] = size @@ -970,19 +972,21 @@ async def _scroll_query(es, params): "took": took, } - search_method = params.get("operation-type") - if search_method == "paginated-search": + if operation_type == "paginated-search": return await _search_after_query(es, params) - elif search_method == "scroll-search": - return await _scroll_query(es, params) - elif "pages" in params: - logging.getLogger(__name__).warning( - "Invoking a scroll search with the 'search' operation is deprecated " - "and will be removed in a future release. Use 'scroll-search' instead." - ) + elif operation_type == "scroll-search": return await _scroll_query(es, params) + elif operation_type == "search": + if "pages" in params: + logging.getLogger(__name__).warning( + "Invoking a scroll search with the 'search' operation is deprecated " + "and will be removed in a future release. Use 'scroll-search' instead." + ) + return await _scroll_query(es, params) + else: + return await _request_body_query(es, params) else: - return await _request_body_query(es, params) + raise exceptions.RallyError(f"No runner available for operation-type: [{operation_type}]") async def _raw_search(self, es, doc_type, index, body, params, headers=None): components = [] diff --git a/esrally/track/params.py b/esrally/track/params.py index 6fd8bee69..a7c87b10f 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -1324,6 +1324,8 @@ def read_bulk(self): register_param_source_for_operation(track.OperationType.Bulk, BulkIndexParamSource) register_param_source_for_operation(track.OperationType.Search, SearchParamSource) +register_param_source_for_operation(track.OperationType.ScrollSearch, SearchParamSource) +register_param_source_for_operation(track.OperationType.PaginatedSearch, SearchParamSource) register_param_source_for_operation(track.OperationType.CreateIndex, CreateIndexParamSource) register_param_source_for_operation(track.OperationType.DeleteIndex, DeleteIndexParamSource) register_param_source_for_operation(track.OperationType.CreateDataStream, CreateDataStreamParamSource) diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index 7ca8603f4..cdf8a0a90 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -958,14 +958,14 @@ async def test_search_task_one_client(self): schedule = driver.schedule_for(task_allocation, param_source) expected_schedule = [ - (0, metrics.SampleType.Warmup, 1 / 8, {}), - (0.1, metrics.SampleType.Warmup, 2 / 8, {}), - (0.2, metrics.SampleType.Warmup, 3 / 8, {}), - (0.3, metrics.SampleType.Normal, 4 / 8, {}), - (0.4, metrics.SampleType.Normal, 5 / 8, {}), - (0.5, metrics.SampleType.Normal, 6 / 8, {}), - (0.6, metrics.SampleType.Normal, 7 / 8, {}), - (0.7, metrics.SampleType.Normal, 8 / 8, {}), + (0, metrics.SampleType.Warmup, 1 / 8, {"operation-type": "search"}), + (0.1, metrics.SampleType.Warmup, 2 / 8, {"operation-type": "search"}), + (0.2, metrics.SampleType.Warmup, 3 / 8, {"operation-type": "search"}), + (0.3, metrics.SampleType.Normal, 4 / 8, {"operation-type": "search"}), + (0.4, metrics.SampleType.Normal, 5 / 8, {"operation-type": "search"}), + (0.5, metrics.SampleType.Normal, 6 / 8, {"operation-type": "search"}), + (0.6, metrics.SampleType.Normal, 7 / 8, {"operation-type": "search"}), + (0.7, metrics.SampleType.Normal, 8 / 8, {"operation-type": "search"}), ] await self.assert_schedule(expected_schedule, schedule) @@ -984,12 +984,12 @@ async def test_search_task_two_clients(self): schedule = driver.schedule_for(task_allocation, param_source) expected_schedule = [ - (0, metrics.SampleType.Warmup, 1 / 6, {}), - (0.2, metrics.SampleType.Normal, 2 / 6, {}), - (0.4, metrics.SampleType.Normal, 3 / 6, {}), - (0.6, metrics.SampleType.Normal, 4 / 6, {}), - (0.8, metrics.SampleType.Normal, 5 / 6, {}), - (1.0, metrics.SampleType.Normal, 6 / 6, {}), + (0, metrics.SampleType.Warmup, 1 / 6, {"operation-type": "search"}), + (0.2, metrics.SampleType.Normal, 2 / 6, {"operation-type": "search"}), + (0.4, metrics.SampleType.Normal, 3 / 6, {"operation-type": "search"}), + (0.6, metrics.SampleType.Normal, 4 / 6, {"operation-type": "search"}), + (0.8, metrics.SampleType.Normal, 5 / 6, {"operation-type": "search"}), + (1.0, metrics.SampleType.Normal, 6 / 6, {"operation-type": "search"}), ] await self.assert_schedule(expected_schedule, schedule) @@ -1014,9 +1014,9 @@ async def test_schedule_param_source_determines_iterations_no_warmup(self): await self.assert_schedule( [ - (0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "size": 3}), - (1.0, metrics.SampleType.Normal, 2 / 3, {"body": ["a"], "size": 3}), - (2.0, metrics.SampleType.Normal, 3 / 3, {"body": ["a"], "size": 3}), + (0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "operation-type": "bulk", "size": 3}), + (1.0, metrics.SampleType.Normal, 2 / 3, {"body": ["a"], "operation-type": "bulk", "size": 3}), + (2.0, metrics.SampleType.Normal, 3 / 3, {"body": ["a"], "operation-type": "bulk", "size": 3}), ], schedule, ) @@ -1042,11 +1042,11 @@ async def test_schedule_param_source_determines_iterations_including_warmup(self await self.assert_schedule( [ - (0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "size": 5}), - (1.0, metrics.SampleType.Warmup, 2 / 5, {"body": ["a"], "size": 5}), - (2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}), - (3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}), - (4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}), + (0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (1.0, metrics.SampleType.Warmup, 2 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), ], schedule, ) @@ -1072,7 +1072,7 @@ async def test_schedule_defaults_to_iteration_based(self): await self.assert_schedule( [ - (0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"]}), + (0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"], "operation-type": "bulk"}), ], schedule, ) @@ -1098,17 +1098,17 @@ async def test_schedule_for_warmup_time_based(self): await self.assert_schedule( [ - (0.0, metrics.SampleType.Normal, 1 / 11, {"body": ["a"], "size": 11}), - (1.0, metrics.SampleType.Normal, 2 / 11, {"body": ["a"], "size": 11}), - (2.0, metrics.SampleType.Normal, 3 / 11, {"body": ["a"], "size": 11}), - (3.0, metrics.SampleType.Normal, 4 / 11, {"body": ["a"], "size": 11}), - (4.0, metrics.SampleType.Normal, 5 / 11, {"body": ["a"], "size": 11}), - (5.0, metrics.SampleType.Normal, 6 / 11, {"body": ["a"], "size": 11}), - (6.0, metrics.SampleType.Normal, 7 / 11, {"body": ["a"], "size": 11}), - (7.0, metrics.SampleType.Normal, 8 / 11, {"body": ["a"], "size": 11}), - (8.0, metrics.SampleType.Normal, 9 / 11, {"body": ["a"], "size": 11}), - (9.0, metrics.SampleType.Normal, 10 / 11, {"body": ["a"], "size": 11}), - (10.0, metrics.SampleType.Normal, 11 / 11, {"body": ["a"], "size": 11}), + (0.0, metrics.SampleType.Normal, 1 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (1.0, metrics.SampleType.Normal, 2 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (2.0, metrics.SampleType.Normal, 3 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (3.0, metrics.SampleType.Normal, 4 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (4.0, metrics.SampleType.Normal, 5 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (5.0, metrics.SampleType.Normal, 6 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (6.0, metrics.SampleType.Normal, 7 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (7.0, metrics.SampleType.Normal, 8 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (8.0, metrics.SampleType.Normal, 9 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (9.0, metrics.SampleType.Normal, 10 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), + (10.0, metrics.SampleType.Normal, 11 / 11, {"body": ["a"], "operation-type": "bulk", "size": 11}), ], schedule, ) @@ -1134,11 +1134,11 @@ async def test_infinite_schedule_without_progress_indication(self): await self.assert_schedule( [ - (0.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (1.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (2.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (3.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (4.0, metrics.SampleType.Normal, None, {"body": ["a"]}), + (0.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}), + (1.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}), + (2.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}), + (3.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}), + (4.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "bulk"}), ], schedule, infinite_schedule=True, @@ -1165,11 +1165,11 @@ async def test_finite_schedule_with_progress_indication(self): await self.assert_schedule( [ - (0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "size": 5}), - (1.0, metrics.SampleType.Normal, 2 / 5, {"body": ["a"], "size": 5}), - (2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}), - (3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}), - (4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}), + (0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (1.0, metrics.SampleType.Normal, 2 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), + (4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "operation-type": "bulk", "size": 5}), ], schedule, infinite_schedule=False, @@ -1192,11 +1192,11 @@ async def test_schedule_with_progress_determined_by_runner(self): await self.assert_schedule( [ - (0.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (1.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (2.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (3.0, metrics.SampleType.Normal, None, {"body": ["a"]}), - (4.0, metrics.SampleType.Normal, None, {"body": ["a"]}), + (0.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}), + (1.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}), + (2.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}), + (3.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}), + (4.0, metrics.SampleType.Normal, None, {"body": ["a"], "operation-type": "driver-test-runner-with-completion"}), ], schedule, infinite_schedule=True, @@ -1240,7 +1240,7 @@ async def test_schedule_for_time_based(self): self.assertTrue(round(progress_percent, 2) >= 0.0, "progress should be >= 0.0 but was [%f]" % progress_percent) self.assertTrue(round(progress_percent, 2) <= 1.0, "progress should be <= 1.0 but was [%f]" % progress_percent) self.assertIsNotNone(runner, "runner must be defined") - self.assertEqual({"body": ["a"], "size": 11}, params) + self.assertEqual({"body": ["a"], "operation-type": "bulk", "size": 11}, params) @run_async async def test_schedule_for_time_based_with_multiple_clients(self): @@ -1287,7 +1287,7 @@ async def test_schedule_for_time_based_with_multiple_clients(self): self.assertTrue(round(progress_percent, 2) >= 0.0, "progress should be >= 0.0 but was [%f]" % progress_percent) self.assertTrue(round(progress_percent, 2) <= 1.0, "progress should be <= 1.0 but was [%f]" % progress_percent) self.assertIsNotNone(runner, "runner must be defined") - self.assertEqual({"body": ["a"], "size": 11}, params) + self.assertEqual({"body": ["a"], "operation-type": "bulk", "size": 11}, params) class AsyncExecutorTests(TestCase): diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 578faf32f..c21142cca 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1365,6 +1365,7 @@ async def test_query_match_only_request_body_defined(self, es): query_runner = runner.Query() params = { + "operation-type": "search", "index": "_all", "detailed-results": True, "cache": True, @@ -1410,6 +1411,7 @@ async def test_query_with_timeout_and_headers(self, es): query_runner = runner.Query() params = { + "operation-type": "search", "index": "_all", "detailed-results": True, "cache": True, @@ -1460,6 +1462,7 @@ async def test_query_match_using_request_params(self, es): query_runner = runner.Query() params = { + "operation-type": "search", "index": "_all", "cache": False, "detailed-results": True, @@ -1513,6 +1516,7 @@ async def test_query_no_detailed_results(self, es): query_runner = runner.Query() params = { + "operation-type": "search", "index": "_all", "body": None, "request-params": {"q": "user:kimchy"}, @@ -1560,6 +1564,7 @@ async def test_query_hits_total_as_number(self, es): query_runner = runner.Query() params = { + "operation-type": "search", "index": "_all", "cache": True, "detailed-results": True, @@ -1614,6 +1619,7 @@ async def test_query_match_all(self, es): query_runner = runner.Query() params = { + "operation-type": "search", "index": "unittest", "detailed-results": True, "response-compression-enabled": False, @@ -1666,6 +1672,7 @@ async def test_query_match_all_doc_type_fallback(self, es): query_runner = runner.Query() params = { + "operation-type": "search", "index": "unittest", "type": "type", "detailed-results": True, @@ -1720,6 +1727,7 @@ async def test_scroll_query_only_one_page(self, es): query_runner = runner.Query() params = { + "operation-type": "scroll-search", "pages": 1, "results-per-page": 100, "index": "unittest", @@ -1780,6 +1788,7 @@ async def test_scroll_query_no_request_cache(self, es): query_runner = runner.Query() params = { + "operation-type": "scroll-search", "pages": 1, "results-per-page": 100, "index": "unittest", @@ -1835,6 +1844,7 @@ async def test_scroll_query_only_one_page_only_request_body_defined(self, es): query_runner = runner.Query() params = { + "operation-type": "scroll-search", "index": "_all", "pages": 1, "results-per-page": 100, @@ -1916,6 +1926,7 @@ async def test_scroll_query_with_explicit_number_of_pages(self, es): query_runner = runner.Query() params = { + "operation-type": "scroll-search", "pages": 2, "results-per-page": 2, "index": "unittest", @@ -1963,6 +1974,7 @@ async def test_scroll_query_cannot_clear_scroll(self, es): query_runner = runner.Query() params = { + "operation-type": "scroll-search", "pages": 5, "results-per-page": 100, "index": "unittest", @@ -2027,6 +2039,7 @@ async def test_scroll_query_request_all_pages(self, es): query_runner = runner.Query() params = { + "operation-type": "scroll-search", "pages": "all", "results-per-page": 4, "index": "unittest", @@ -2052,6 +2065,83 @@ async def test_scroll_query_request_all_pages(self, es): es.clear_scroll.assert_awaited_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch("elasticsearch.Elasticsearch") + @run_async + async def test_query_runner_search_with_pages_logs_warning_and_executes(self, es): + # page 1 + search_response = { + "_scroll_id": "some-scroll-id", + "took": 4, + "timed_out": False, + "hits": { + "total": {"value": 2, "relation": "eq"}, + "hits": [ + {"title": "some-doc-1"}, + {"title": "some-doc-2"}, + ], + }, + } + + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) + es.clear_scroll = mock.AsyncMock(return_value=io.StringIO('{"acknowledged": true}')) + + query_runner = runner.Query() + + params = { + "operation-type": "search", + "pages": 1, + "results-per-page": 100, + "index": "unittest", + "cache": True, + "body": { + "query": { + "match_all": {}, + }, + }, + } + + with mock.patch.object(query_runner.logger, "warning") as mocked_warning_logger: + results = await query_runner(es, params) + mocked_warning_logger.assert_has_calls( + [ + mock.call( + "Invoking a scroll search with the 'search' operation is deprecated " + "and will be removed in a future release. Use 'scroll-search' instead." + ) + ] + ) + + self.assertEqual(1, results["weight"]) + self.assertEqual(1, results["pages"]) + self.assertEqual(2, results["hits"]) + self.assertEqual("eq", results["hits_relation"]) + self.assertEqual(4, results["took"]) + self.assertEqual("pages", results["unit"]) + self.assertFalse(results["timed_out"]) + self.assertFalse("error-type" in results) + + @mock.patch("elasticsearch.Elasticsearch") + @run_async + async def test_query_runner_fails_with_unknown_operation_type(self, es): + query_runner = runner.Query() + + params = { + "operation-type": "unknown", + "index": "unittest", + "body": { + "query": { + "match_all": {}, + }, + }, + } + + with self.assertRaises(exceptions.RallyError) as ctx: + await query_runner(es, params) + self.assertEqual( + "No runner available for operation-type: [unknown]", + ctx.exception.args[0], + ) + class PutPipelineRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") diff --git a/tests/track/params_test.py b/tests/track/params_test.py index f74a4248f..6f6125f5c 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -2580,7 +2580,7 @@ def test_passes_request_parameters(self): p["body"], ) - def test_user_specified_overrides_defaults(self): + def test_user_specified_index_overrides_defaults(self): index1 = track.Index(name="index1", types=["type1"]) source = params.SearchParamSource(