diff --git a/docs/track.rst b/docs/track.rst index ff591ab59..304cf64c9 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -1302,8 +1302,8 @@ shrink-index With the operation ``shrink-index`` you can execute the `shrink index API `_. Note that this does not correspond directly to the shrink index API call in Elasticsearch but it is a high-level operation that executes all the necessary low-level operations under the hood to shrink an index. It supports the following parameters: -* ``source-index`` (mandatory): The name of the index that should be shrinked. -* ``target-index`` (mandatory): The name of the index that contains the shrinked shards. +* ``source-index`` (mandatory): The name of the index that should be shrinked. Multiple indices can be defined using the `Multi-target syntax `_. +* ``target-index`` (mandatory): The name of the index that contains the shrinked shards. If multiple indices match ``source-index``, one shrink operation will execute for every matching index. Each shrink operation will use a modified ``target-index``: the unique suffix of the source index (derived by removing the common prefix of all matching source indices) will be appended to ``target-index``. See also the example below. * ``target-body`` (mandatory): The body containing settings and aliases for ``target-index``. * ``shrink-node`` (optional, defaults to a random data node): As a first step, the source index needs to be fully relocated to a single node. Rally will automatically choose a random data node in the cluster but you can choose one explicitly if needed. @@ -1325,6 +1325,24 @@ Example:: This will shrink the index ``src`` to ``target``. The target index will consist of one shard and have one replica. With ``shrink-node`` we also explicitly specify the name of the node where we want the source index to be relocated to. +The following example ``src*`` matches a list of indices ``src-a,src-b``:: + + { + "operation-type": "shrink-index", + "shrink-node": "rally-node-0", + "source-index": "src*", + "target-index": "target", + "target-body": { + "settings": { + "index.number_of_replicas": 1, + "index.number_of_shards": 1, + "index.codec": "best_compression" + } + } + } + +and will reindex ``src-a`` as ``target-a`` and ``src-b`` as ``target-b``. + This operation is :ref:`retryable `. delete-ml-datafeed diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 2dc4e7fbf..2b910fc36 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -23,6 +23,8 @@ import types from collections import Counter, OrderedDict from copy import deepcopy +from os.path import commonprefix + import ijson from esrally import exceptions, track @@ -301,8 +303,17 @@ def mandatory(params, key, op): try: return params[key] except KeyError: - raise exceptions.DataError("Parameter source for operation '%s' did not provide the mandatory parameter '%s'. Please add it to your" - " parameter source." % (str(op), key)) + raise exceptions.DataError( + f"Parameter source for operation '{str(op)}' did not provide the mandatory parameter '{key}'. " + f"Add it to your parameter source and try again.") + + +# TODO: remove and use https://docs.python.org/3/library/stdtypes.html#str.removeprefix +# once Python 3.9 becomes the minimum version +def remove_prefix(string, prefix): + if string.startswith(prefix): + return string[len(prefix):] + return string def escape(v): @@ -1320,12 +1331,19 @@ async def _wait_for(self, es, idx, description): async def __call__(self, es, params): source_index = mandatory(params, "source-index", self) + source_indices_get = await es.indices.get(source_index) + source_indices = list(source_indices_get.keys()) + source_indices_stem = commonprefix(source_indices) + target_index = mandatory(params, "target-index", self) + # we need to inject additional settings so we better copy the body target_body = deepcopy(mandatory(params, "target-body", self)) shrink_node = params.get("shrink-node") # Choose a random data node if none is specified - if not shrink_node: + if shrink_node: + node_names = [shrink_node] + else: node_names = [] # choose a random data node node_info = await es.nodes.info() @@ -1333,35 +1351,40 @@ async def __call__(self, es, params): if "data" in node["roles"]: node_names.append(node["name"]) if not node_names: - raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Please specify it explicitly.") + raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Specify it explicitly.") + + for source_index in source_indices: shrink_node = random.choice(node_names) - self.logger.info("Using [%s] as shrink node.", shrink_node) - self.logger.info("Preparing [%s] for shrinking.", source_index) - # prepare index for shrinking - await es.indices.put_settings(index=source_index, - body={ - "settings": { - "index.routing.allocation.require._name": shrink_node, - "index.blocks.write": "true" - } - }, - preserve_existing=True) - - self.logger.info("Waiting for relocation to finish for index [%s]...", source_index) - await self._wait_for(es, source_index, "shard relocation for index [{}]".format(source_index)) - self.logger.info("Shrinking [%s] to [%s].", source_index, target_index) - if "settings" not in target_body: - target_body["settings"] = {} - target_body["settings"]["index.routing.allocation.require._name"] = None - target_body["settings"]["index.blocks.write"] = None - # kick off the shrink operation - await es.indices.shrink(index=source_index, target=target_index, body=target_body) - - self.logger.info("Waiting for shrink to finish for index [%s]...", source_index) - await self._wait_for(es, target_index, "shrink for index [{}]".format(target_index)) - self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, target_index) + self.logger.info("Using [%s] as shrink node.", shrink_node) + self.logger.info("Preparing [%s] for shrinking.", source_index) + + # prepare index for shrinking + await es.indices.put_settings(index=source_index, + body={ + "settings": { + "index.routing.allocation.require._name": shrink_node, + "index.blocks.write": "true" + } + }, + preserve_existing=True) + + self.logger.info("Waiting for relocation to finish for index [%s] ...", source_index) + await self._wait_for(es, source_index, f"shard relocation for index [{source_index}]") + self.logger.info("Shrinking [%s] to [%s].", source_index, target_index) + if "settings" not in target_body: + target_body["settings"] = {} + target_body["settings"]["index.routing.allocation.require._name"] = None + target_body["settings"]["index.blocks.write"] = None + # kick off the shrink operation + index_suffix = remove_prefix(source_index, source_indices_stem) + final_target_index = target_index if len(index_suffix) == 0 else target_index+index_suffix + await es.indices.shrink(index=source_index, target=final_target_index, body=target_body) + + self.logger.info("Waiting for shrink to finish for index [%s] ...", source_index) + await self._wait_for(es, final_target_index, f"shrink for index [{final_target_index}]") + self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, final_target_index) # ops_count is not really important for this operation... - return 1, "ops" + return len(source_indices), "ops" def __repr__(self, *args, **kwargs): return "shrink-index" diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index afc9567a5..141e53568 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -235,8 +235,9 @@ async def test_bulk_index_missing_params(self, es): with self.assertRaises(exceptions.DataError) as ctx: await bulk(es, bulk_params) - self.assertEqual("Parameter source for operation 'bulk-index' did not provide the mandatory parameter 'action-metadata-present'. " - "Please add it to your parameter source.", ctx.exception.args[0]) + self.assertEqual( + "Parameter source for operation 'bulk-index' did not provide the mandatory parameter 'action-metadata-present'. " + "Add it to your parameter source and try again.", ctx.exception.args[0]) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -2042,7 +2043,7 @@ async def test_param_body_mandatory(self, es): } with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. " - "Please add it to your parameter source."): + "Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.ingest.put_pipeline.call_count) @@ -2059,7 +2060,7 @@ async def test_param_id_mandatory(self, es): } with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'id'. " - "Please add it to your parameter source."): + "Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.ingest.put_pipeline.call_count) @@ -2310,7 +2311,7 @@ async def test_param_indices_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'create-index' did not provide the mandatory parameter 'indices'. " - "Please add it to your parameter source."): + "Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.indices.create.call_count) @@ -2355,7 +2356,7 @@ async def test_param_data_streams_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'create-data-stream' did not provide the " - "mandatory parameter 'data-streams'. Please add it to your parameter source."): + "mandatory parameter 'data-streams'. Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.indices.create_data_stream.call_count) @@ -2494,7 +2495,7 @@ async def test_param_templates_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'create-index-template' did not provide the mandatory parameter " - "'templates'. Please add it to your parameter source."): + "'templates'. Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.indices.put_template.call_count) @@ -2565,7 +2566,7 @@ async def test_param_templates_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'delete-index-template' did not provide the mandatory parameter " - "'templates'. Please add it to your parameter source."): + "'templates'. Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.indices.delete_template.call_count) @@ -2607,7 +2608,7 @@ async def test_param_templates_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'create-component-template' did not provide the mandatory parameter " - "'templates'. Please add it to your parameter source."): + "'templates'. Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.cluster.put_component_template.call_count) @@ -2679,7 +2680,7 @@ async def test_param_templates_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'delete-component-template' did not provide the mandatory parameter " - "'templates'. Please add it to your parameter source."): + "'templates'. Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.indices.delete_template.call_count) @@ -2722,7 +2723,7 @@ async def test_param_templates_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'create-composable-template' did not provide the mandatory parameter " - "'templates'. Please add it to your parameter source."): + "'templates'. Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.cluster.put_index_template.call_count) @@ -2794,7 +2795,7 @@ async def test_param_templates_mandatory(self, es): params = {} with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'delete-composable-template' did not provide the mandatory parameter " - "'templates'. Please add it to your parameter source."): + "'templates'. Add it to your parameter source and try again."): await r(es, params) self.assertEqual(0, es.indices.delete_index_template.call_count) @@ -3323,7 +3324,7 @@ async def test_missing_parameter(self, sleep, es): r = runner.Sleep() with self.assertRaisesRegex(exceptions.DataError, "Parameter source for operation 'sleep' did not provide the mandatory parameter " - "'duration'. Please add it to your parameter source."): + "'duration'. Add it to your parameter source and try again."): await r(es, params={}) self.assertEqual(0, es.call_count) @@ -3862,6 +3863,9 @@ class ShrinkIndexTests(TestCase): @mock.patch("asyncio.sleep", return_value=as_future()) @run_async async def test_shrink_index_with_shrink_node(self, sleep, es): + es.indices.get.return_value = as_future({ + "src": {} + }) # cluster health API es.cluster.health.return_value = as_future({ "status": "green", @@ -3913,6 +3917,9 @@ async def test_shrink_index_with_shrink_node(self, sleep, es): @mock.patch("asyncio.sleep", return_value=as_future()) @run_async async def test_shrink_index_derives_shrink_node(self, sleep, es): + es.indices.get.return_value = as_future({ + "src": {} + }) # cluster health API es.cluster.health.return_value = as_future({ "status": "green", @@ -3989,6 +3996,101 @@ async def test_shrink_index_derives_shrink_node(self, sleep, es): } }) + @mock.patch("elasticsearch.Elasticsearch") + # To avoid real sleeps in unit tests + @mock.patch("asyncio.sleep", return_value=as_future()) + @run_async + async def test_shrink_index_pattern_with_shrink_node(self, sleep, es): + es.indices.get.return_value = as_future({ + "src1": {}, "src2": {}, "src-2020": {} + }) + # cluster health API + es.cluster.health.return_value = as_future({ + "status": "green", + "relocating_shards": 0 + }) + es.indices.put_settings.return_value = as_future() + es.indices.shrink.return_value = as_future() + + r = runner.ShrinkIndex() + params = { + "source-index": "src*", + "target-index": "target", + "target-body": { + "settings": { + "index.number_of_replicas": 2, + "index.number_of_shards": 0 + } + }, + "shrink-node": "rally-node-0" + } + + await r(es, params) + + es.indices.put_settings.assert_has_calls([ + mock.call(index="src1", + body={ + "settings": { + "index.routing.allocation.require._name": "rally-node-0", + "index.blocks.write": "true" + } + }, + preserve_existing=True), + mock.call(index="src2", + body={ + "settings": { + "index.routing.allocation.require._name": "rally-node-0", + "index.blocks.write": "true" + } + }, + preserve_existing=True), + mock.call(index="src-2020", + body={ + "settings": { + "index.routing.allocation.require._name": "rally-node-0", + "index.blocks.write": "true" + } + }, + preserve_existing=True)]) + + es.cluster.health.assert_has_calls([ + mock.call(index="src1", params={"wait_for_no_relocating_shards": "true"}), + mock.call(index="target1", params={"wait_for_no_relocating_shards": "true"}), + mock.call(index="src2", params={"wait_for_no_relocating_shards": "true"}), + mock.call(index="target2", params={"wait_for_no_relocating_shards": "true"}), + mock.call(index="src-2020", params={"wait_for_no_relocating_shards": "true"}), + mock.call(index="target-2020", params={"wait_for_no_relocating_shards": "true"}), + ]) + + es.indices.shrink.assert_has_calls([ + mock.call( + index="src1", target="target1", body={ + "settings": { + "index.number_of_replicas": 2, + "index.number_of_shards": 0, + "index.routing.allocation.require._name": None, + "index.blocks.write": None + } + }), + mock.call( + index="src2", target="target2", body={ + "settings": { + "index.number_of_replicas": 2, + "index.number_of_shards": 0, + "index.routing.allocation.require._name": None, + "index.blocks.write": None + } + }), + mock.call( + index="src-2020", target="target-2020", body={ + "settings": { + "index.number_of_replicas": 2, + "index.number_of_shards": 0, + "index.routing.allocation.require._name": None, + "index.blocks.write": None + } + })]) + class PutSettingsTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @@ -4603,3 +4705,16 @@ async def test_retries_until_success(self): self.assertEqual(success_return_value, result) delegate.assert_has_calls([mock.call(es, params) for _ in range(failure_count + 1)]) + + +class RemovePrefixTests(TestCase): + def test_remove_matching_prefix(self): + suffix = runner.remove_prefix("index-20201117", "index") + + self.assertEqual(suffix, "-20201117") + + def test_prefix_doesnt_exit(self): + index_name = "index-20201117" + suffix = runner.remove_prefix(index_name, "unrelatedprefix") + + self.assertEqual(suffix, index_name)