Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds shrink action to ISM #326

Merged
merged 13 commits into from
Apr 16, 2022

Conversation

downsrob
Copy link
Contributor

@downsrob downsrob commented Apr 7, 2022

Issue #, if available:
#40

Description of changes:

Adds implementation for the shrink action in ISM.

The shrink action allows users to reduce the number of primary shards in their indices using ISM policies, and provides many additional safety checks to make sure the index, nodes, or even cluster is not harmed by the action. When shrinking an index, you may provide either the number of new shards, the percentage of the original shards, or the maximum shard size for the shrunken index. While the shrink action is ongoing, the source index will be set to readonly, and a copy of each of the shards in the index will need to be on a single node with at least 2*(source index size) free disk space below the high disk watermark level. The shrunken index name will, by default, consist of a prefix of the source index name, and a suffix of "_shrunken". Alternatively, you may provide a mustache template script to generate the target index name using the index name or uuid. The shrink action does not delete the source index after completion.

Configuration:

Exactly one of num_new_shards, max_shard_size, and percentage_of_source_shards is required. If you provide multiple, the action will fail.

  • num_new_shards (int): the maximum number of primary shards in the shrunken index.
  • max_shard_size (byte size value e.g. "5gb"): the maximum size of a shard on in the target index.
  • percentage_of_source_shards (double): % of the number of original primary shards. This will indicate the minimum percentage by which the number of primary shards will shrink. This value must be between 0.0 and 1.0 exclusive.
  • target_index_name_template (optional script): the shrunken index name will be generated with this mustache template script if provided. The mustache template may access the index and indexUuid from the ctx object. An example would be "target_index_name_template":{"source": "{{ctx.index}}_shrink"}
  • aliases (optional list of Aliases): aliases to be added to the new target index.
  • force_unsafe (optional boolean): If this boolean is set to true, the action will execute even if there are no replicas.

Steps:

A failure in any step will result in the shrink action starting over from the first step with a blank slate.
attempt_move_shards_step:
This step performs safety checks, calculates the number of shards to shrink to, selects a node to shrink on, sets the source index to readonly, and begins moving a copy of each shard to the target node.
Checks:

  • The target index name, which the shrunken index will be created with, does not exist
  • The source index is green
  • The source index either has replicas or the policy had force_unsafe set to true
  • The shrink will not result in the shards on the target index having greater than the maximum number of documents on any shard (2^31)
  • Some node in the cluster has at least 2*(source index size) free disk space below the high disk watermark level, and a dry run of allocating one of each shard in the source index to that node passes. Additionally, only one node may be selected for a shrink action at one time.
    If all of these checks pass, the shards will be set to allocate to the selected node and the index will be set to read only.

wait_for_move_shards_step:
This step waits for a copy of each shard in the source index to be moved onto the node selected in the first step. Additionally, this step waits for all replicas to be in sync with the primaries, as any shard copy may have been moved to the selected node, and shrinking using an out of sync replica would result in missing operations on the shrunken index.
Checks:

  • All source index shards are in sync
  • At least one copy of each shard successfully was routed to the selected node
  • If this step waits for longer than the configured timeout, or the default of 12 hours, the action fails.

attempt_shrink_step:
This step verifies that the index is still green and that the selected node still has space for the shrink, and then calls the resize index api.
Checks:

  • The source index is green
  • The selected node still has enough space to shrink to
    If all of these checks pass, the shrink will begin.

wait_for_shrink_step:
This step waits for waits until the shrunken index is created and all of its primary shards have started accepting operations.
Checks:

  • all primary shards on the shrunken index have started accepting operations
  • If this step waits for longer than the configured timeout, or the default of 12 hours, the action fails.

Example policy:

{
    "policy_id": "shrink_then_delete",
    "description": "A sample description of the policy",
    "last_updated_time": 1649990327715,
    "schema_version": 14,
    "error_notification": null,
    "default_state": "shrink",
    "states": [
        {
            "name": "shrink",
            "actions": [
                {
                    "retry": {
                        "count": 3,
                        "backoff": "exponential",
                        "delay": "1m"
                    },
                    "shrink": {
                        "num_new_shards": 1,
                        "target_index_name_template": {
                            "source": "shrunken_{{ctx.index}}",
                            "lang": "mustache"
                        },
                        "aliases": [],
                        "force_unsafe": true
                    }
                }
            ],
            "transitions": [
                {
                    "state_name": "delete"
                }
            ]
        },
        {
            "name": "delete",
            "actions": [
                {
                    "retry": {
                        "count": 3,
                        "backoff": "exponential",
                        "delay": "1m"
                    },
                    "delete": {}
                }
            ],
            "transitions": []
        }
    ],
    "ism_template": []
}

TODOs before release:

  • Unit tests for steps and util functions
  • Explicitly deny or allow a group of nodes to be considered for use.
  • Job scheduler sweeper to clean up stale locks

CheckList:

  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@codecov-commenter
Copy link

codecov-commenter commented Apr 7, 2022

Codecov Report

Merging #326 (eb1eb90) into main (6259f9a) will decrease coverage by 1.28%.
The diff coverage is 53.67%.

@@             Coverage Diff              @@
##               main     #326      +/-   ##
============================================
- Coverage     76.60%   75.31%   -1.29%     
- Complexity     2050     2130      +80     
============================================
  Files           253      260       +7     
  Lines         11642    12332     +690     
  Branches       1808     1937     +129     
============================================
+ Hits           8918     9288     +370     
- Misses         1753     1992     +239     
- Partials        971     1052      +81     
Impacted Files Coverage Δ
...exstatemanagement/step/shrink/WaitForShrinkStep.kt 31.57% <31.57%> (ø)
...atemanagement/step/shrink/WaitForMoveShardsStep.kt 36.95% <36.95%> (ø)
...exstatemanagement/step/shrink/AttemptShrinkStep.kt 41.41% <41.41%> (ø)
...atemanagement/step/shrink/AttemptMoveShardsStep.kt 55.33% <55.33%> (ø)
...gement/indexstatemanagement/action/ShrinkAction.kt 66.17% <66.17%> (ø)
...agement/indexstatemanagement/ManagedIndexRunner.kt 46.08% <75.00%> (+0.12%) ⬆️
...xmanagement/indexstatemanagement/util/StepUtils.kt 77.33% <77.33%> (ø)
...ent/indexstatemanagement/util/ManagedIndexUtils.kt 76.33% <83.33%> (-1.50%) ⬇️
.../indexstatemanagement/action/ShrinkActionParser.kt 85.71% <85.71%> (ø)
...anagement/indexstatemanagement/ISMActionsParser.kt 91.66% <100.00%> (+0.14%) ⬆️
... and 9 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6259f9a...eb1eb90. Read the comment docs.

Copy link
Contributor

@dbbaughe dbbaughe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing the 2.0.0-alpha1 has the new changes you added to JS, but what is the updated 1.3 zip for?

@downsrob
Copy link
Contributor Author

downsrob commented Apr 8, 2022

Guessing the 2.0.0-alpha1 has the new changes you added to JS, but what is the updated 1.3 zip for?

Just a miss on my part, thanks for calling it out.

val numNewShards: Int?,
val maxShardSize: ByteSizeValue?,
val percentageOfSourceShards: Double?,
val targetIndexSuffix: String?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What's the rationale for us to only allow the customer to set the suffix as opposed to the entire target index?
  2. Do users usually add a suffix or prefix for shrunk indices, i.e. shrink-foo vs foo-shrink? Any differences when attempting to query a group of indices that are mixed (non-shrunk + shrunk)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The rationale is that if you set the entire target name then you can't apply the same policy to two indices, as the target index name would collide and only one would succeed.
  2. I don't have this data point, and I can't think of a significant difference either way in how it is queried.

This made me think though, it seems like ISM would benefit from a reusable naming component which enables building a target index string with date math, suffix, prefix, etc so we could add it to some actions, rollups and transforms in a quick and easy way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also make this mustache template similar to snapshot action

Copy link
Contributor

@thalurur thalurur Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are making this suffix instead of prefix any index pattern based policy application the cluster will have will be still applied to the index however this can result in the shrink action being run on the newly created index (this might be an issue if say shrink action shrinked index 50% and then newly created index matched with shrink policy and it again shrinks it to 50% and so on). We can mark the auto manage setting to false on the shrink indices created by ISM policy

Copy link
Contributor Author

@downsrob downsrob Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thalurur that is a fair point, but that seems like an annoyance to have a break in your management cycle where you need to manually apply policies to shrunken indices. A user could always use prefix matching instead of suffix matching for ISM policies I think, so *logs instead of logs*, and that wouldn't match after the suffix addition.

I would prefer to just do a mustache template instead, as you mentioned. I will look into that and see if I can get it into this PR or if I will have to add it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the suffix field to be a mustache template.

class ShrinkAction(
val numNewShards: Int?,
val maxShardSize: ByteSizeValue?,
val percentageOfSourceShards: Double?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior when this doesn't output an integer when determining the number of shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to update the PR comment on this, but Shrink action has the requirement that if you shrink an index from x primary shards to y primary shards, then y must be a factor of x.

  • For numNewShards, we shrink to the greatest factor of x which is less than the numNewShards input.
  • For percentageOfSourceShards we use the greatest factor of x which is less than floor(percentageOfSourceShards * x).
  • For maxShardSize we divide the index size by the max shard size to get the minimum number of shards to shrink to without violating the maxShardSize constraint, and then use the minimum factor of x which is greater than that.

@Suppress("LongParameterList")
class ShrinkAction(
val numNewShards: Int?,
val maxShardSize: ByteSizeValue?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if someone sets this to a lower number than the current averaged shard sizes and the logic for determining new shards is larger than current shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, so say an index is 1gb and has 10 primary shards and someone says, give me a max primary shard size of 10 bytes, that would come up with over 100 shards minimum. We would then try to calculate the minimum factor of 10 which is greater than ~100 shards.
We have a catch for this though, and it will just return the same number of shards as the source (10).

* Returns the smallest number which is >= k and is a factor of n. In the context of the shrink action,
* n is the original number of shards, k is the attempted number of shards to shrink to in a case
*/
@SuppressWarnings("ReturnCount")
private fun getMinFactorGreaterThan(n: Int, k: Int): Int {
if (k >= n) {
return n
}

I believe that at this point the action would continue as normal and still allocate all of the shards to a node and then duplicate the index with the same number of shards as the source. Do you have any thoughts on what the user experience should be for that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW we currently do a no-op if the source index only has one shard. I can see that may be unexpected as the target index with the suffix wouldn't be created. I am curious what your thoughts are for the user experience for that as well


@Suppress("LongParameterList")
class ShrinkAction(
val numNewShards: Int?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This question might get answered down below, but asking here just to have it as I read through - what happens if they set this higher than current num shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. We would generate a new index with the same number of shards as the source index.

}
}
val suitableNodes: ArrayList<String> = ArrayList()
// For each node, do a dry run of moving all shards to the node to make sure there is enough space.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this and the above getNodeFreeMemoryAfterShrink check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocation dry run succeeding says that if we tried to allocate the index to the node it would succeed without any blocks from settings or anything weird with the cluster. If the node is full but some reallocation of shards could be done to make it work, then it would still pass. This does not take into account the fact that the index we are moving onto that node is about to be duplicated, so we do an additional check in getNodeFreeMemoryAfterShrink to make sure that the node will still be below the high watermark following the duplication of the shrink action. Adding some comments in the code to clarify.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are comparing memory usage to disk size. I am not sure if this makes sense.
For example, if the index is 20gb, are we requiring there are 40gb free memory?

Copy link
Contributor

@thalurur thalurur Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but I believe NodeStats class OSStats.mem refer to disk usage and JVMStats.memrefer to heap usage (I don't see documentation calling it out explicitly) https://www.javadoc.io/static/org.elasticsearch/elasticsearch/0.17.2/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.html

Comment on lines +388 to +447
// If we couldn't get the job interval for the lock, use the default of 12 hours.
// Lock is 3x + 30 minutes the job interval to allow the next step's execution to extend the lock without losing it.
// If user sets maximum jitter, it could be 2x the job interval before the next step is executed.
private fun getShrinkLockDuration(jobInterval: Long?) = jobInterval?.let { (it * JOB_INTERVAL_LOCK_MULTIPLIER) + LOCK_BUFFER_SECONDS }
?: DEFAULT_LOCK_INTERVAL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is 3x + 30 minutes from?
The default in open source is 5 minutes and people can change it to 1 minute; any issue w/ the shrink lock duration being short lived in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is somewhat excessive perhaps, here was my rationale:
with max jitter it can be 2x job interval between executions (other cases in the runner may result in skipped executions I believe, I don't know how common these are though so maybe it is worth bumping it up).
The lock is acquired at the start of the job so there is the run time of the job itself, with the timeouts of all of the api calls in the attemptMoveShardsStep, that can definitely be a few minutes, though that is again pretty unlikely, plus the overhead of the runner logic before the step starts.
A tighter estimate would be 2x + 5 or 6 minutes, but I just decided to give it some padding. I think the only time we expect the lock to expire is if the job is deleted or paused between executions, and a tighter bound is better in that case. What do you think?

Comment on lines 100 to 105
val suitableNodes = findSuitableNodes(context, statsResponse, indexSize)

// Get the job interval to use in determining the lock length
val interval = getJobIntervalSeconds(context.metadata.indexUuid, client)
// iterate through the nodes and try to acquire a lock on one
val lockToNodeName: Pair<LockModel, String>? = acquireLockFromNodeList(context.jobContext, suitableNodes, interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these "suitable nodes", I looked through the logic and it seems like it might include dedicated masters which we wouldn't want (or I missed something that is excluding them)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might also be useful for an admin to explicitly deny a group of nodes from being used or set a group of nodes which can only be considered for use.

Comment on lines +558 to +562
withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, configSource, XContentType.JSON)
managedIndexConfig = xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, ManagedIndexConfig.Companion::parse)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember off the top of my head, but how do exceptions thrown in withContext blocks behave? Do they just got thrown up the calling stack? (i.e. what happens if an exception is thrown while parsing here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, in withContext an exception closes the scope and then gets thrown up the calling stack. To double check, I tested forcing an IOException in this spot and it behaved as expected.

Comment on lines +192 to +205
logger.error(TOO_MANY_DOCS_FAILURE_MESSAGE)
fail(TOO_MANY_DOCS_FAILURE_MESSAGE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: it seems like we log and call fail multiple places - instead we could just log inside the fail() method

stepStatus = StepStatus.FAILED
// Non-null assertion !! is used to throw an exception on null which would just be caught and logged
try {
resetReadOnlyAndRouting(context!!.metadata.index, context!!.client, shrinkActionProperties!!.originalIndexSettings)
Copy link
Contributor

@thalurur thalurur Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we avoid the !! I understand we are catching the NPEs but still to keep consistency with the repo- can we instead pass context and shrinkActionProperties to this cleanupAndFail to avoid this?

}

// Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock
private suspend fun cleanupAndFail(message: String, cause: String? = null, e: Exception? = null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be repeated - can we instead have it as a common method and you can take cleanup target index as a parameter potentially to determine which steps should do that additional action

}
}
}
if (numShardsOnNode >= numPrimaryShards && numShardsInSync >= numPrimaryShards) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am little confused by the second condition - why are we checking if in sync shards are more than primary shards and not in sync shards == total number of shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numShardsInSync represents the number of shard Ids which are in sync (all replicas with the primary). You can see that it gets iterated above if there are no replicas or if all of the replicas are in sync with the primary. Both of these >= could be ==, and likely always will be. I usually think of >= as more resilient to unknown code bugs, which is why I do that. If that is bad style let me know

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetadata.actionMetaData
// If we succeeded because there was only one source primary shard, we no-op by skipping to the last step
val stepMetaData = if (info?.get("message") == ONE_PRIMARY_SHARD_MESSAGE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to depend on info.message here, or just use a onePrimaryShard variable in this class?

For the comment, skipping to the next action would be clearer.

Comment on lines +72 to +81
if (routingInfo.primary()) {
if (nodeNameShardIsOn.equals(nodeToMoveOnto) && routingInfo.started()) {
numShardsOnNode++
}
// Either there must be no replicas (force unsafe must have been set) or all replicas must be in sync as
// it isn't known which shard (any replica or primary) will be moved to the target node and used in the shrink.
if (numReplicas == 0 || inSyncAllocations[routingInfo.id].size == (numReplicas + 1)) {
numShardsInSync++
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For routingInfo.primary(), does this mean only checking the routing info for primary
If so, then this seems to check if all primary shards are on the target node, not primary and replica shards on the target node.

@bowenlan-amzn
Copy link
Member

a single node with at least 2*(source index size) free disk space below the high disk watermark level

2*(source index size) seems not good enough for me, we need to relocate the index to this node, and then shrink index will also be on this node, 2*(source index size) will all be consumed then.

3 would make more sense to me, what do you think @thalurur @dbbaughe

@thalurur
Copy link
Contributor

a single node with at least 2*(source index size) free disk space below the high disk watermark level

2*(source index size) seems not good enough for me, we need to relocate the index to this node, and then shrink index will also be on this node, 2*(source index size) will all be consumed then.

3 would make more sense to me, what do you think @thalurur @dbbaughe

I think we have buffer added right for high disk usage threshold 2x+threshold. I think we can be conservative with 3x but that might result in false failures of the action despite having enough space especially for larger indices, we can have a buffer threshold as a setting if we think we want to make this more customizable on top of 2*x + disk threshold

@downsrob downsrob force-pushed the development-shrink-upgraded branch from 3a510f1 to eb1eb90 Compare April 15, 2022 02:53
@downsrob downsrob merged commit ab1e821 into opensearch-project:main Apr 16, 2022
@downsrob downsrob deleted the development-shrink-upgraded branch April 16, 2022 00:03
wuychn pushed a commit to ochprince/index-management that referenced this pull request Mar 16, 2023
* Updates shrink action to new interface

Signed-off-by: Clay Downs <[email protected]>
@Angie-Zhang Angie-Zhang mentioned this pull request Apr 5, 2023
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants