Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow index pattern for source-index in shrink-index operation #1118

Merged
merged 3 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1302,8 +1302,8 @@ shrink-index

With the operation ``shrink-index`` you can execute the `shrink index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html>`_. Note that this does not correspond directly to the shrink index API call in Elasticsearch but it is a high-level operation that executes all the necessary low-level operations under the hood to shrink an index. It supports the following parameters:

* ``source-index`` (mandatory): The name of the index that should be shrinked.
* ``target-index`` (mandatory): The name of the index that contains the shrinked shards.
* ``source-index`` (mandatory): The name of the index that should be shrinked. Multiple indices can be defined using the `Multi-target syntax <https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html>`_.
* ``target-index`` (mandatory): The name of the index that contains the shrinked shards. If multiple indices match ``source-index``, the common prefix will appended to ``target-index``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it the other way around? The common prefix will be removed and instead the unique parts of each source index name will be appended to the string specified via target-index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course 🤦 ! Addressed in a3cab0e

* ``target-body`` (mandatory): The body containing settings and aliases for ``target-index``.
* ``shrink-node`` (optional, defaults to a random data node): As a first step, the source index needs to be fully relocated to a single node. Rally will automatically choose a random data node in the cluster but you can choose one explicitly if needed.

Expand All @@ -1325,6 +1325,24 @@ Example::

This will shrink the index ``src`` to ``target``. The target index will consist of one shard and have one replica. With ``shrink-node`` we also explicitly specify the name of the node where we want the source index to be relocated to.

The following example ``src*`` matches a list of indices ``src-a,src-b``::

{
"operation-type": "shrink-index",
"shrink-node": "rally-node-0",
"source-index": "src*",
"target-index": "target",
"target-body": {
"settings": {
"index.number_of_replicas": 1,
"index.number_of_shards": 1,
"index.codec": "best_compression"
}
}
}

and will reindex ``src-a`` as ``target-a`` and ``src-b`` as ``target-b``.

This operation is :ref:`retryable <track_operations>`.

delete-ml-datafeed
Expand Down
75 changes: 47 additions & 28 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import types
from collections import Counter, OrderedDict
from copy import deepcopy
from os.path import commonprefix
dliappis marked this conversation as resolved.
Show resolved Hide resolved

import ijson

from esrally import exceptions, track
Expand Down Expand Up @@ -301,8 +303,15 @@ def mandatory(params, key, op):
try:
return params[key]
except KeyError:
raise exceptions.DataError("Parameter source for operation '%s' did not provide the mandatory parameter '%s'. Please add it to your"
" parameter source." % (str(op), key))
raise exceptions.DataError(
f"Parameter source for operation '{str(op)}' did not provide the mandatory parameter '{key}'. "
f"Add it to your parameter source and try again.")


def remove_prefix(string, prefix):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad we don't have Python 3.9 as minimum version requirement as this has been recently added to the standard library. I wonder whether we should put a TODO into the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I noticed that too while coding writing this. Added a TODO in a3cab0e.

if string.startswith(prefix):
return string[len(prefix):]
return string


def escape(v):
Expand Down Expand Up @@ -1320,46 +1329,56 @@ async def _wait_for(self, es, idx, description):

async def __call__(self, es, params):
source_index = mandatory(params, "source-index", self)
source_indices_get = await es.indices.get(source_index)
source_indices = list(source_indices_get.keys())
source_indices_stem = commonprefix(source_indices)

target_index = mandatory(params, "target-index", self)

# we need to inject additional settings so we better copy the body
target_body = deepcopy(mandatory(params, "target-body", self))
shrink_node = params.get("shrink-node")
# Choose a random data node if none is specified
node_names = [shrink_node] if shrink_node else []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes the code a bit hard to read as you operate on two variables (node_names and shrink_node) in several branches and check shrink_node multiple times.

How about this instead?

        # Choose a random data node if none is specified
        if shrink_node:
            node_names = [shrink_node]
        else:
            node_names = []
            # choose a random data node
            node_info = await es.nodes.info()
            for node in node_info["nodes"].values():
                if "data" in node["roles"]:
                    node_names.append(node["name"])
            if not node_names:
                raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Please specify it explicitly.")

This makes it easier to reason because there is only one top-level conditional and then we populate node_names in each branch separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not shrink_node:
node_names = []
# choose a random data node
node_info = await es.nodes.info()
for node in node_info["nodes"].values():
if "data" in node["roles"]:
node_names.append(node["name"])
if not node_names:
raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Please specify it explicitly.")

for source_index in source_indices:
shrink_node = random.choice(node_names)
self.logger.info("Using [%s] as shrink node.", shrink_node)
self.logger.info("Preparing [%s] for shrinking.", source_index)
# prepare index for shrinking
await es.indices.put_settings(index=source_index,
body={
"settings": {
"index.routing.allocation.require._name": shrink_node,
"index.blocks.write": "true"
}
},
preserve_existing=True)

self.logger.info("Waiting for relocation to finish for index [%s]...", source_index)
await self._wait_for(es, source_index, "shard relocation for index [{}]".format(source_index))
self.logger.info("Shrinking [%s] to [%s].", source_index, target_index)
if "settings" not in target_body:
target_body["settings"] = {}
target_body["settings"]["index.routing.allocation.require._name"] = None
target_body["settings"]["index.blocks.write"] = None
# kick off the shrink operation
await es.indices.shrink(index=source_index, target=target_index, body=target_body)

self.logger.info("Waiting for shrink to finish for index [%s]...", source_index)
await self._wait_for(es, target_index, "shrink for index [{}]".format(target_index))
self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, target_index)
self.logger.info("Using [%s] as shrink node.", shrink_node)
self.logger.info("Preparing [%s] for shrinking.", source_index)

# prepare index for shrinking
await es.indices.put_settings(index=source_index,
body={
"settings": {
"index.routing.allocation.require._name": shrink_node,
"index.blocks.write": "true"
}
},
preserve_existing=True)

self.logger.info("Waiting for relocation to finish for index [%s] ...", source_index)
await self._wait_for(es, source_index, "shard relocation for index [{}]".format(source_index))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use an f-string here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 52f28a6

self.logger.info("Shrinking [%s] to [%s].", source_index, target_index)
if "settings" not in target_body:
target_body["settings"] = {}
target_body["settings"]["index.routing.allocation.require._name"] = None
target_body["settings"]["index.blocks.write"] = None
# kick off the shrink operation
index_suffix = remove_prefix(source_index, source_indices_stem)
final_target_index = target_index if len(index_suffix) == 0 else target_index+index_suffix
await es.indices.shrink(index=source_index, target=final_target_index, body=target_body)

self.logger.info("Waiting for shrink to finish for index [%s] ...", source_index)
await self._wait_for(es, final_target_index, "shrink for index [{}]".format(final_target_index))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could run things in parallel by moving this to a separate loop later, however, I am not sure whether a high level of concurrent shrink operations will actually be beneficial or cause contention and provide minimal advantage (esp. if only one shrink node has been provided).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I'd not complicate things unless there is a strong benefit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use an f-string here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a3cab0e

self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, final_target_index)
# ops_count is not really important for this operation...
return 1, "ops"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use the number of shrunk indices here for the operation count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a3cab0e


Expand Down
Loading