-
Notifications
You must be signed in to change notification settings - Fork 313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow index pattern for source-index in shrink-index operation #1118
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,8 @@ | |
import types | ||
from collections import Counter, OrderedDict | ||
from copy import deepcopy | ||
from os.path import commonprefix | ||
dliappis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ijson | ||
|
||
from esrally import exceptions, track | ||
|
@@ -301,8 +303,17 @@ def mandatory(params, key, op): | |
try: | ||
return params[key] | ||
except KeyError: | ||
raise exceptions.DataError("Parameter source for operation '%s' did not provide the mandatory parameter '%s'. Please add it to your" | ||
" parameter source." % (str(op), key)) | ||
raise exceptions.DataError( | ||
f"Parameter source for operation '{str(op)}' did not provide the mandatory parameter '{key}'. " | ||
f"Add it to your parameter source and try again.") | ||
|
||
|
||
# TODO: remove and use https://docs.python.org/3/library/stdtypes.html#str.removeprefix | ||
# once Python 3.9 becomes the minimum version | ||
def remove_prefix(string, prefix): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Too bad we don't have Python 3.9 as minimum version requirement as this has been recently added to the standard library. I wonder whether we should put a TODO into the code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I noticed that too while coding writing this. Added a TODO in a3cab0e. |
||
if string.startswith(prefix): | ||
return string[len(prefix):] | ||
return string | ||
|
||
|
||
def escape(v): | ||
|
@@ -1320,48 +1331,60 @@ async def _wait_for(self, es, idx, description): | |
|
||
async def __call__(self, es, params): | ||
source_index = mandatory(params, "source-index", self) | ||
source_indices_get = await es.indices.get(source_index) | ||
source_indices = list(source_indices_get.keys()) | ||
source_indices_stem = commonprefix(source_indices) | ||
|
||
target_index = mandatory(params, "target-index", self) | ||
|
||
# we need to inject additional settings so we better copy the body | ||
target_body = deepcopy(mandatory(params, "target-body", self)) | ||
shrink_node = params.get("shrink-node") | ||
# Choose a random data node if none is specified | ||
if not shrink_node: | ||
if shrink_node: | ||
node_names = [shrink_node] | ||
else: | ||
node_names = [] | ||
# choose a random data node | ||
node_info = await es.nodes.info() | ||
for node in node_info["nodes"].values(): | ||
if "data" in node["roles"]: | ||
node_names.append(node["name"]) | ||
if not node_names: | ||
raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Please specify it explicitly.") | ||
raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Specify it explicitly.") | ||
|
||
for source_index in source_indices: | ||
shrink_node = random.choice(node_names) | ||
self.logger.info("Using [%s] as shrink node.", shrink_node) | ||
self.logger.info("Preparing [%s] for shrinking.", source_index) | ||
# prepare index for shrinking | ||
await es.indices.put_settings(index=source_index, | ||
body={ | ||
"settings": { | ||
"index.routing.allocation.require._name": shrink_node, | ||
"index.blocks.write": "true" | ||
} | ||
}, | ||
preserve_existing=True) | ||
|
||
self.logger.info("Waiting for relocation to finish for index [%s]...", source_index) | ||
await self._wait_for(es, source_index, "shard relocation for index [{}]".format(source_index)) | ||
self.logger.info("Shrinking [%s] to [%s].", source_index, target_index) | ||
if "settings" not in target_body: | ||
target_body["settings"] = {} | ||
target_body["settings"]["index.routing.allocation.require._name"] = None | ||
target_body["settings"]["index.blocks.write"] = None | ||
# kick off the shrink operation | ||
await es.indices.shrink(index=source_index, target=target_index, body=target_body) | ||
|
||
self.logger.info("Waiting for shrink to finish for index [%s]...", source_index) | ||
await self._wait_for(es, target_index, "shrink for index [{}]".format(target_index)) | ||
self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, target_index) | ||
self.logger.info("Using [%s] as shrink node.", shrink_node) | ||
self.logger.info("Preparing [%s] for shrinking.", source_index) | ||
|
||
# prepare index for shrinking | ||
await es.indices.put_settings(index=source_index, | ||
body={ | ||
"settings": { | ||
"index.routing.allocation.require._name": shrink_node, | ||
"index.blocks.write": "true" | ||
} | ||
}, | ||
preserve_existing=True) | ||
|
||
self.logger.info("Waiting for relocation to finish for index [%s] ...", source_index) | ||
await self._wait_for(es, source_index, f"shard relocation for index [{source_index}]") | ||
self.logger.info("Shrinking [%s] to [%s].", source_index, target_index) | ||
if "settings" not in target_body: | ||
target_body["settings"] = {} | ||
target_body["settings"]["index.routing.allocation.require._name"] = None | ||
target_body["settings"]["index.blocks.write"] = None | ||
# kick off the shrink operation | ||
index_suffix = remove_prefix(source_index, source_indices_stem) | ||
final_target_index = target_index if len(index_suffix) == 0 else target_index+index_suffix | ||
await es.indices.shrink(index=source_index, target=final_target_index, body=target_body) | ||
|
||
self.logger.info("Waiting for shrink to finish for index [%s] ...", source_index) | ||
await self._wait_for(es, final_target_index, f"shrink for index [{final_target_index}]") | ||
self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, final_target_index) | ||
# ops_count is not really important for this operation... | ||
return 1, "ops" | ||
return len(source_indices), "ops" | ||
|
||
def __repr__(self, *args, **kwargs): | ||
return "shrink-index" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much clearer, thanks!