diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt index da8d379f1..cce120339 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt @@ -132,8 +132,7 @@ class ShrinkAction( const val TARGET_INDEX_TEMPLATE_FIELD = "target_index_name_template" const val ALIASES_FIELD = "aliases" const val FORCE_UNSAFE_FIELD = "force_unsafe" - const val LOCK_RESOURCE_TYPE = "shrink" - const val LOCK_RESOURCE_NAME = "node_name" + const val LOCK_SOURCE_JOB_ID = "shrink-node_name" fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt index 6d1764e4c..a08e5beaf 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt @@ -28,9 +28,9 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.util.getIntervalFromManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.util.getManagedIndexConfig -import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink +import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeDiskSpaceAfterShrink import org.opensearch.indexmanagement.indexstatemanagement.util.getShardIdToNodeNameSet -import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockID +import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkJobID import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest import org.opensearch.indexmanagement.opensearchapi.convertToMap @@ -60,43 +60,66 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, override suspend fun wrappedExecute(context: StepContext): AttemptMoveShardsStep { val client = context.client val indexName = context.metadata.index - val shrinkTargetIndexName = - compileTemplate(action.targetIndexTemplate, context.metadata, indexName + DEFAULT_TARGET_SUFFIX, context.scriptService) - - if (targetIndexNameIsInvalid(context.clusterService, shrinkTargetIndexName)) return this + // Only index at green status can be shrunk if (!isIndexGreen(client, indexName)) { info = mapOf("message" to INDEX_NOT_GREEN_MESSAGE) stepStatus = StepStatus.CONDITION_NOT_MET return this } - if (shouldFailUnsafe(context.clusterService, indexName)) return this + // Check if target index name is valid + val shrinkTargetIndexName = + compileTemplate( + action.targetIndexTemplate, + context.metadata, + indexName + DEFAULT_TARGET_SUFFIX, + context.scriptService + ) + if (targetIndexNameIsInvalid(context.clusterService, shrinkTargetIndexName)) return this - // If there is only one primary shard we complete the step and in getUpdatedManagedIndexMetadata will start a no-op - val numOriginalShards = context.clusterService.state().metadata.indices[indexName].numberOfShards - if (numOriginalShards == 1) { - info = mapOf("message" to ONE_PRIMARY_SHARD_MESSAGE) - stepStatus = StepStatus.COMPLETED - return this - } + if (shouldFailUnsafe(context.clusterService, indexName)) return this // Get stats on index size and docs val (statsStore, statsDocs, shardStats) = getIndexStats(indexName, client) ?: return this val indexSize = statsStore.sizeInBytes + // Get stats of current and target shards + val numOriginalShards = context.clusterService.state().metadata.indices[indexName].numberOfShards val numTargetShards = getNumTargetShards(numOriginalShards, indexSize) if (shouldFailTooManyDocuments(statsDocs, numTargetShards)) return this - val originalIndexSettings = getOriginalSettings(indexName, context.clusterService) + // If there is only one primary shard we complete the step and in getUpdatedManagedIndexMetadata will start a no-op + if (numOriginalShards == 1) { + info = mapOf("message" to ONE_PRIMARY_SHARD_MESSAGE) + stepStatus = StepStatus.COMPLETED + return this + } + // If source index already has the shard count equal to target number of shard, we complete the step + if (numOriginalShards == numTargetShards) { + info = mapOf("message" to NO_SHARD_COUNT_CHANGE_MESSAGE) + stepStatus = StepStatus.COMPLETED + return this + } - // get the nodes with enough memory in increasing order of free space - val suitableNodes = findSuitableNodes(context, shardStats, indexSize) + // Get original index settings; WaitForShrinkStep.resetReadOnlyAndRouting method resets write block and routing node after shrink completed. + val originalIndexSettings = getOriginalSettings(indexName, context.clusterService) // Get the job interval to use in determining the lock length val interval = getJobIntervalSeconds(context.metadata.indexUuid, client) - // iterate through the nodes and try to acquire a lock on one - val (lock, nodeName) = acquireLockFromNodeList(context.lockService, suitableNodes, interval, indexName) ?: return this + + // Get candidate nodes for shrink + val suitableNodes = findSuitableNodes(context, shardStats, indexSize) + + if (suitableNodes.isEmpty()) { + info = mapOf("message" to NO_AVAILABLE_NODES_MESSAGE) + stepStatus = StepStatus.CONDITION_NOT_MET + return this + } + + // Iterate through the suitable nodes and try to acquire a lock on one + val (lock, nodeName) = acquireLockFromNodeList(context.lockService, suitableNodes, interval, indexName) + ?: return this shrinkActionProperties = ShrinkActionProperties( nodeName, shrinkTargetIndexName, @@ -116,7 +139,10 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, override fun getGenericFailureMessage(): String = FAILURE_MESSAGE - private suspend fun getIndexStats(indexName: String, client: Client): Triple>? { + private suspend fun getIndexStats( + indexName: String, + client: Client + ): Triple>? { val statsRequest = IndicesStatsRequest().indices(indexName) val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) @@ -125,7 +151,10 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val statsDocs = statsResponse.total.docs val statsShards = statsResponse.shards if (statsStore == null || statsDocs == null || statsShards == null) { - fail(FAILURE_MESSAGE, "Failed to move shards in shrink action as IndicesStatsResponse was missing some stats.") + setStepFailed( + FAILURE_MESSAGE, + "Failed to move shards in shrink action as IndicesStatsResponse was missing some stats." + ) return null } return Triple(statsStore, statsDocs, statsShards) @@ -172,7 +201,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val totalDocs: Long = docsStats.count val docsPerTargetShard: Long = totalDocs / numTargetShards if (docsPerTargetShard > MAXIMUM_DOCS_PER_SHARD) { - fail(TOO_MANY_DOCS_FAILURE_MESSAGE, TOO_MANY_DOCS_FAILURE_MESSAGE) + setStepFailed(TOO_MANY_DOCS_FAILURE_MESSAGE, TOO_MANY_DOCS_FAILURE_MESSAGE) return true } return false @@ -190,7 +219,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val shouldFailForceUnsafeCheck = numReplicas == 0 if (shouldFailForceUnsafeCheck) { logger.info(UNSAFE_FAILURE_MESSAGE) - fail(UNSAFE_FAILURE_MESSAGE) + setStepFailed(UNSAFE_FAILURE_MESSAGE) return true } return false @@ -200,17 +229,23 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val indexExists = clusterService.state().metadata.indices.containsKey(shrinkTargetIndexName) if (indexExists) { val indexExistsMessage = getIndexExistsMessage(shrinkTargetIndexName) - fail(indexExistsMessage, indexExistsMessage) + setStepFailed(indexExistsMessage, indexExistsMessage) return true } - val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) } + val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> + InvalidIndexNameException(index_name, reason) + } // If the index name is invalid for any reason, this will throw an exception giving the reason why in the message. // That will be displayed to the user as the cause. validateIndexOrAliasName(shrinkTargetIndexName, exceptionGenerator) return false } - private suspend fun setToReadOnlyAndMoveIndexToNode(stepContext: StepContext, node: String, lock: LockModel): Boolean { + private suspend fun setToReadOnlyAndMoveIndexToNode( + stepContext: StepContext, + node: String, + lock: LockModel + ): Boolean { val updateSettings = Settings.builder() .put(SETTING_BLOCKS_WRITE, true) .put(ROUTING_SETTING, node) @@ -223,10 +258,10 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, } finally { isUpdateAcknowledged = response != null && response.isAcknowledged if (!isUpdateAcknowledged) { - fail(UPDATE_FAILED_MESSAGE, UPDATE_FAILED_MESSAGE) + setStepFailed(UPDATE_FAILED_MESSAGE, UPDATE_FAILED_MESSAGE) val released: Boolean = lockService.suspendUntil { release(lock, it) } if (!released) { - logger.error("Failed to release Shrink action lock on node [$node]") + logger.error("Failed to release Shrink action lock on node: [$node]") } } } @@ -244,45 +279,63 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, indexName: String ): Pair? { for (nodeName in suitableNodes) { - val lockID = getShrinkLockID(nodeName) + val lockID = getShrinkJobID(nodeName) val lock: LockModel? = lockService.suspendUntil { acquireLockWithId(INDEX_MANAGEMENT_INDEX, getShrinkLockDuration(jobIntervalSeconds), lockID, it) } if (lock != null) { return lock to nodeName + } else { + logger.info("Shrink action could not acquire lock of node [$nodeName] for [$indexName] .") } } - logger.info("Shrink action could not find available node to shrink onto for index [$indexName].") - info = mapOf("message" to NO_AVAILABLE_NODES_MESSAGE) + info = mapOf("message" to NO_UNLOCKED_NODES_MESSAGE) stepStatus = StepStatus.CONDITION_NOT_MET return null } /* - * Returns the list of node names for nodes with enough space to shrink to, in increasing order of space available + * Returns the list of available nodes for shrink, in increasing order of space available */ - @SuppressWarnings("NestedBlockDepth", "ComplexMethod") + @SuppressWarnings("NestedBlockDepth", "ComplexMethod", "LongMethod") private suspend fun findSuitableNodes( stepContext: StepContext, shardStats: Array, indexSizeInBytes: Long ): List { val nodesStatsReq = NodesStatsRequest().addMetric(FS_METRIC) - val nodeStatsResponse: NodesStatsResponse = stepContext.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) } + val nodeStatsResponse: NodesStatsResponse = stepContext.client.admin().cluster().suspendUntil { + nodesStats(nodesStatsReq, it) + } val nodesList = nodeStatsResponse.nodes.filter { it.node.isDataNode } + val suitableNodes: ArrayList = ArrayList() + // Sort in increasing order of keys, in our case this is memory remaining - val comparator = kotlin.Comparator { o1: Tuple, o2: Tuple -> o1.v1().compareTo(o2.v1()) } + val comparator = kotlin.Comparator { o1: Tuple, o2: Tuple -> + o1.v1().compareTo(o2.v1()) + } val nodesWithSpace = PriorityQueue(comparator) for (node in nodesList) { - // Gets the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, + // Gets the amount of disk space in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, // as the source index is duplicated during the shrink - val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.clusterService.clusterSettings) - if (remainingMem > 0L) { - nodesWithSpace.add(Tuple(remainingMem, node.node.name)) + val remainingDiskSpace = getNodeFreeDiskSpaceAfterShrink( + node, + indexSizeInBytes, + stepContext.clusterService.clusterSettings + ) + if (remainingDiskSpace > 0L) { + nodesWithSpace.add(Tuple(remainingDiskSpace, node.node.name)) } } - val shardIdToNodeList: Map> = getShardIdToNodeNameSet(shardStats, stepContext.clusterService.state().nodes) - val suitableNodes: ArrayList = ArrayList() + // If no node has enough disk space, skip next step + if (nodesWithSpace.size < 1) { + logger.info("No node has enough disk space for shrink action.") + return suitableNodes + } + val shardIdToNodeList: Map> = getShardIdToNodeNameSet( + shardStats, + stepContext.clusterService.state().nodes + ) // For each node, do a dry run of moving all shards to the node to make sure that there aren't any other blockers // to the allocation. for (sizeNodeTuple in nodesWithSpace) { @@ -294,18 +347,38 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val shardId = shard.shardRouting.shardId() val currentShardNode = stepContext.clusterService.state().nodes[shard.shardRouting.currentNodeId()] // Don't attempt a dry run for shards which have a copy already on that node - if (shardIdToNodeList[shardId.id]?.contains(targetNodeName) == true || requestedShardIds.contains(shardId.id)) continue - clusterRerouteRequest.add(MoveAllocationCommand(indexName, shardId.id, currentShardNode.name, targetNodeName)) + if (shardIdToNodeList[shardId.id]?.contains(targetNodeName) == true || requestedShardIds.contains( + shardId.id + ) + ) continue + clusterRerouteRequest.add( + MoveAllocationCommand(indexName, shardId.id, currentShardNode.name, targetNodeName) + ) requestedShardIds.add(shardId.id) } val clusterRerouteResponse: ClusterRerouteResponse = stepContext.client.admin().cluster().suspendUntil { reroute(clusterRerouteRequest, it) } - val numYesDecisions = clusterRerouteResponse.explanations.explanations().count { it.decisions().type().equals((Decision.Type.YES)) } - // Should be the same number of yes decisions as the number of requests - if (numYesDecisions == requestedShardIds.size) { + val numOfDecisions = clusterRerouteResponse.explanations.explanations().size + val numNoDecisions = clusterRerouteResponse.explanations.explanations().count { + it.decisions().type().equals((Decision.Type.NO)) + } + val numYesDecisions = clusterRerouteResponse.explanations.explanations().count { + it.decisions().type().equals((Decision.Type.YES)) + } + val numThrottleDecisions = clusterRerouteResponse.explanations.explanations().count { + it.decisions().type().equals((Decision.Type.THROTTLE)) + } + logger.debug( + getShardMovingDecisionInfo(numNoDecisions, numYesDecisions, numThrottleDecisions, targetNodeName) + ) + // NO decision type is not counted; YES and THROTTLE decision type are available for shrink. + if (numOfDecisions - numNoDecisions >= requestedShardIds.size) { suitableNodes.add(sizeNodeTuple.v2()) } } + if (suitableNodes.size < 1) { + logger.info("No node has shard moving permission for shrink action.") + } return suitableNodes } @@ -397,27 +470,40 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, const val ROUTING_SETTING = "index.routing.allocation.require._name" const val DEFAULT_TARGET_SUFFIX = "_shrunken" const val name = "attempt_move_shards_step" - const val UPDATE_FAILED_MESSAGE = "Shrink failed because shard settings could not be updated." + const val UPDATE_FAILED_MESSAGE = "Shrink action failed because shard settings could not be updated." const val NO_AVAILABLE_NODES_MESSAGE = - "There are no available nodes to move to to execute a shrink. Delaying until node becomes available." + "There is no node with enough disk space or shard moving permission for a shrink action." + const val NO_UNLOCKED_NODES_MESSAGE = + "Candidate node for shrink action is locked for other actions. Delaying until gets unlocked." const val UNSAFE_FAILURE_MESSAGE = "Shrink failed because index has no replicas and force_unsafe is not set to true." - const val ONE_PRIMARY_SHARD_MESSAGE = "Shrink action did not do anything because source index only has one primary shard." - const val TOO_MANY_DOCS_FAILURE_MESSAGE = "Shrink failed because there would be too many documents on each target shard following the shrink." - const val INDEX_NOT_GREEN_MESSAGE = "Shrink action cannot start moving shards as the index is not green." - const val FAILURE_MESSAGE = "Shrink failed to start moving shards." + const val ONE_PRIMARY_SHARD_MESSAGE = "Source index only has one primary shard. Skip this shrink execution." + const val NO_SHARD_COUNT_CHANGE_MESSAGE = "Source index already has target number of shards. Skip this shrink execution." + const val TOO_MANY_DOCS_FAILURE_MESSAGE = "Shrink action failed due to too many documents on each target shard after shrink." + const val INDEX_NOT_GREEN_MESSAGE = "Shrink action cannot continue as the index is not green." + const val FAILURE_MESSAGE = "Shrink action failed due to initial moving shards failure." private const val DEFAULT_LOCK_INTERVAL = 3L * 60L * 60L // Default lock interval is 3 hours in seconds private const val MILLISECONDS_IN_SECOND = 1000L const val THIRTY_SECONDS_IN_MILLIS = 30L * MILLISECONDS_IN_SECOND private const val JOB_INTERVAL_LOCK_MULTIPLIER = 3 private const val LOCK_BUFFER_SECONDS = 1800 private const val MAXIMUM_DOCS_PER_SHARD = 0x80000000 // The maximum number of documents per shard is 2^31 - fun getSuccessMessage(node: String) = "Successfully started moving the shards to $node." + fun getSuccessMessage(node: String) = "Successfully initialized moving the shards to $node for a shrink action." fun getIndexExistsMessage(newIndex: String) = "Shrink failed because $newIndex already exists." + fun getShardMovingDecisionInfo( + noCount: Int, + yesCount: Int, + throttleCount: Int, + node: String + ) = "Shard moving decisions on node $node, NO: $noCount, YES: $yesCount, THROTTLE: $throttleCount." + // If we couldn't get the job interval for the lock, use the default of 12 hours. // Lock is 3x + 30 minutes the job interval to allow the next step's execution to extend the lock without losing it. // If user sets maximum jitter, it could be 2x the job interval before the next step is executed. - private fun getShrinkLockDuration(jobInterval: Long?) = jobInterval?.let { (it * JOB_INTERVAL_LOCK_MULTIPLIER) + LOCK_BUFFER_SECONDS } + private fun getShrinkLockDuration( + jobInterval: Long? + ) = jobInterval?.let { (it * JOB_INTERVAL_LOCK_MULTIPLIER) + LOCK_BUFFER_SECONDS } ?: DEFAULT_LOCK_INTERVAL + private val ALLOWED_TEMPLATE_FIELDS = setOf("index", "indexUuid") } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt index 023aec502..87dba6361 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt @@ -11,11 +11,14 @@ import org.opensearch.action.admin.indices.shrink.ResizeRequest import org.opensearch.action.admin.indices.shrink.ResizeResponse import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS -import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink +import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeDiskSpaceAfterShrink import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen +import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -29,15 +32,18 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru override suspend fun wrappedExecute(context: StepContext): AttemptShrinkStep { val indexName = context.metadata.index // If the returned shrinkActionProperties are null, then the status has been set to failed, just return - val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this + val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this if (!isIndexGreen(context.client, indexName)) { stepStatus = StepStatus.CONDITION_NOT_MET info = mapOf("message" to INDEX_HEALTH_NOT_GREEN_MESSAGE) return this } + if (!isNodeStillSuitable(localShrinkActionProperties.nodeName, indexName, context)) return this + if (!confirmIndexWriteBlock(context, indexName)) return this + // If the resize index api fails, the step will be set to failed and resizeIndex will return false if (!resizeIndex(indexName, localShrinkActionProperties, context)) return this info = mapOf("message" to getSuccessMessage(localShrinkActionProperties.targetIndexName)) @@ -69,7 +75,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru cleanupAndFail(FAILURE_MESSAGE, "Shrink action failed as node stats were missing the previously selected node.") return false } - val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings) + val remainingMem = getNodeFreeDiskSpaceAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings) if (remainingMem < 1L) { cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE, NOT_ENOUGH_SPACE_FAILURE_MESSAGE) return false @@ -77,6 +83,26 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru return true } + // Set index write block again before sending shrink request, in case of write block flipped by other processes in previous steps. + private suspend fun confirmIndexWriteBlock(stepContext: StepContext, indexName: String): Boolean { + val updateSettings = Settings.builder() + .put(IndexMetadata.SETTING_BLOCKS_WRITE, true) + .build() + var response: AcknowledgedResponse? = null + val isUpdateAcknowledged: Boolean + + try { + response = issueUpdateSettingsRequest(stepContext.client, stepContext.metadata.index, updateSettings) + } finally { + isUpdateAcknowledged = response != null && response.isAcknowledged + } + + if (!isUpdateAcknowledged) { + cleanupAndFail(WRITE_BLOCK_FAILED_MESSAGE, "Failed to confirm write block for index: [$indexName] before sending shrink request.") + } + return isUpdateAcknowledged + } + private suspend fun resizeIndex(sourceIndex: String, shrinkActionProperties: ShrinkActionProperties, context: StepContext): Boolean { val targetIndex = shrinkActionProperties.targetIndexName val req = ResizeRequest(targetIndex, sourceIndex) @@ -113,6 +139,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru companion object { const val name = "attempt_shrink_step" const val FAILURE_MESSAGE = "Shrink failed when sending shrink request." + const val WRITE_BLOCK_FAILED_MESSAGE = "Failed to set write block before sending shrink request." const val NOT_ENOUGH_SPACE_FAILURE_MESSAGE = "Shrink failed as the selected node no longer had enough free space to shrink to." const val INDEX_HEALTH_NOT_GREEN_MESSAGE = "Shrink delayed because index health is not green." fun getSuccessMessage(newIndex: String) = "Shrink started. $newIndex currently being populated." diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt index fe29857c0..d45845cb1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt @@ -55,7 +55,7 @@ abstract class ShrinkStep( protected suspend fun cleanupAndFail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) { cleanupResources(cleanupSettings, cleanupLock, cleanupTargetIndex) - fail(infoMessage, logMessage, cause, e) + setStepFailed(infoMessage, logMessage, cause, e) } abstract fun getGenericFailureMessage(): String @@ -63,12 +63,11 @@ abstract class ShrinkStep( abstract suspend fun wrappedExecute(context: StepContext): Step @Suppress("ReturnCount") - protected suspend fun updateAndGetShrinkActionProperties(context: StepContext): ShrinkActionProperties? { + protected suspend fun checkShrinkActionPropertiesAndRenewLock(context: StepContext): ShrinkActionProperties? { val actionMetadata = context.metadata.actionMetaData var localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties - shrinkActionProperties = localShrinkActionProperties if (localShrinkActionProperties == null) { - cleanupAndFail(METADATA_FAILURE_MESSAGE, METADATA_FAILURE_MESSAGE) + setStepFailed(METADATA_FAILURE_MESSAGE, METADATA_FAILURE_MESSAGE) return null } val lock = renewShrinkLock(localShrinkActionProperties, context.lockService, logger) @@ -85,7 +84,7 @@ abstract class ShrinkStep( return localShrinkActionProperties } - protected fun fail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) { + protected fun setStepFailed(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) { if (logMessage != null) { if (e != null) { logger.error(logMessage, e) @@ -95,7 +94,6 @@ abstract class ShrinkStep( } info = if (cause == null) mapOf("message" to infoMessage) else mapOf("message" to infoMessage, "cause" to cause) stepStatus = StepStatus.FAILED - shrinkActionProperties = null } protected suspend fun cleanupResources(resetSettings: Boolean, releaseLock: Boolean, deleteTargetIndex: Boolean) { @@ -104,6 +102,7 @@ abstract class ShrinkStep( if (resetSettings) resetIndexSettings(localShrinkActionProperties) if (deleteTargetIndex) deleteTargetIndex(localShrinkActionProperties) if (releaseLock) releaseLock(localShrinkActionProperties) + shrinkActionProperties = null } else { logger.error("Shrink action failed to clean up resources due to null shrink action properties.") } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt index a8dcb6fcd..e4f27686b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt @@ -26,7 +26,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, override suspend fun wrappedExecute(context: StepContext): WaitForMoveShardsStep { val indexName = context.metadata.index // If the returned shrinkActionProperties are null, then the status has been set to failed, just return - val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this + val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this val shardStats = getShardStats(indexName, context.client) ?: return this @@ -87,7 +87,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val response: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(indexStatsRequests, it) } val shardStats = response.shards if (shardStats == null) { - fail(AttemptMoveShardsStep.FAILURE_MESSAGE, "Failed to move shards in shrink action as shard stats were null.") + cleanupAndFail(AttemptMoveShardsStep.FAILURE_MESSAGE, "Failed to move shards in shrink action as shard stats were null.") return null } return shardStats diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt index 319f6d2ea..6eb783a85 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt @@ -30,7 +30,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru override suspend fun wrappedExecute(context: StepContext): WaitForShrinkStep { val indexName = context.metadata.index // If the returned shrinkActionProperties are null, then the status has been set to failed, just return - val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this + val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this val targetIndex = localShrinkActionProperties.targetIndexName if (shrinkNotDone(targetIndex, localShrinkActionProperties.targetNumShards, context.client, context.clusterService)) { @@ -42,7 +42,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru if (!clearAllocationSettings(context, targetIndex)) return this if (!resetReadOnlyAndRouting(indexName, context.client, localShrinkActionProperties.originalIndexSettings)) return this - if (!deleteShrinkLock(localShrinkActionProperties, context.lockService)) { + if (!deleteShrinkLock(localShrinkActionProperties, context.lockService, logger)) { logger.error("Failed to delete Shrink action lock on node [${localShrinkActionProperties.nodeName}]") } stepStatus = StepStatus.COMPLETED diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt index 512129f88..121fae408 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt @@ -25,8 +25,7 @@ import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX -import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_RESOURCE_NAME -import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_RESOURCE_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_SOURCE_JOB_ID import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptMoveShardsStep import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -52,9 +51,11 @@ suspend fun releaseShrinkLock( suspend fun deleteShrinkLock( shrinkActionProperties: ShrinkActionProperties, - lockService: LockService + lockService: LockService, + logger: Logger ): Boolean { val lockID = getShrinkLockID(shrinkActionProperties.nodeName) + logger.info("Deleting lock: $lockID") return lockService.suspendUntil { deleteLock(lockID, it) } } @@ -94,11 +95,11 @@ fun getShrinkLockModel( lockSeqNo: Long, lockDurationSecond: Long ): LockModel { - val lockID = getShrinkLockID(nodeName) + val jobID = getShrinkJobID(nodeName) val lockCreationInstant: Instant = Instant.ofEpochSecond(lockEpochSecond) return LockModel( jobIndexName, - lockID, + jobID, lockCreationInstant, lockDurationSecond, false, @@ -163,7 +164,7 @@ fun getDiskSettings(clusterSettings: ClusterSettings): Settings { * Returns the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, or -1 * if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats. */ -fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long { +fun getNodeFreeDiskSpaceAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long { val fsStats = node.fs if (fsStats != null) { val diskSpaceLeftInNode = fsStats.total.free.bytes @@ -202,7 +203,14 @@ suspend fun resetReadOnlyAndRouting(index: String, client: Client, originalSetti } fun getShrinkLockID(nodeName: String): String { - return "$LOCK_RESOURCE_TYPE-$LOCK_RESOURCE_NAME-$nodeName" + return LockModel.generateLockId( + INDEX_MANAGEMENT_INDEX, + getShrinkJobID(nodeName) + ) +} + +fun getShrinkJobID(nodeName: String): String { + return "$LOCK_SOURCE_JOB_ID-$nodeName" } // Creates a map of shardIds to the set of node names which the shard copies reside on. For example, with 2 replicas diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt index 5419cf2c5..a9a671e72 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt @@ -37,7 +37,7 @@ class StepUtilsTests : OpenSearchTestCase() { ) val lockModel = getShrinkLockModel(shrinkActionProperties) assertEquals("Incorrect lock model job index name", INDEX_MANAGEMENT_INDEX, lockModel.jobIndexName) - assertEquals("Incorrect lock model jobID", getShrinkLockID(shrinkActionProperties.nodeName), lockModel.jobId) + assertEquals("Incorrect lock model jobID", getShrinkJobID(shrinkActionProperties.nodeName), lockModel.jobId) assertEquals("Incorrect lock model duration", shrinkActionProperties.lockDurationSecond, lockModel.lockDurationSeconds) assertEquals("Incorrect lock model lockID", "${lockModel.jobIndexName}-${lockModel.jobId}", lockModel.lockId) assertEquals("Incorrect lock model sequence number", shrinkActionProperties.lockSeqNo, lockModel.seqNo) @@ -129,9 +129,9 @@ class StepUtilsTests : OpenSearchTestCase() { val clusterSettings = ClusterSettings(settings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) val remainingSpace = freeBytes - ((2 * indexSize) + threshold) if (remainingSpace > 0) { - assertEquals(remainingSpace, getNodeFreeMemoryAfterShrink(nodeStats, indexSize, clusterSettings)) + assertEquals(remainingSpace, getNodeFreeDiskSpaceAfterShrink(nodeStats, indexSize, clusterSettings)) } else { - assertEquals(-1L, getNodeFreeMemoryAfterShrink(nodeStats, indexSize, clusterSettings)) + assertEquals(-1L, getNodeFreeDiskSpaceAfterShrink(nodeStats, indexSize, clusterSettings)) } } }