From 0373ba8cd595c0fd5ccb6dd60116aec79e09a052 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Fri, 1 Apr 2022 21:31:12 +0000 Subject: [PATCH] Testing job lock Signed-off-by: Clay Downs --- .../model/ShrinkActionProperties.kt | 16 +- .../action/ShrinkAction.kt | 6 + .../step/shrink/AttemptMoveShardsStep.kt | 166 ++++++++++++------ .../step/shrink/AttemptShrinkStep.kt | 77 ++++++-- .../step/shrink/WaitForMoveShardsStep.kt | 59 ++++--- .../step/shrink/WaitForShrinkStep.kt | 79 ++++++--- .../util/ManagedIndexUtils.kt | 39 +++- .../indexstatemanagement/util/StepUtils.kt | 68 ++++++- .../action/ShrinkActionIT.kt | 67 +++++++ .../indexstatemanagement/model/ActionTests.kt | 4 - 10 files changed, 447 insertions(+), 134 deletions(-) diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ShrinkActionProperties.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ShrinkActionProperties.kt index f5d236c5c..07ded7a10 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ShrinkActionProperties.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ShrinkActionProperties.kt @@ -20,7 +20,8 @@ data class ShrinkActionProperties( val targetNumShards: Int, val lockPrimaryTerm: Long, val lockSeqNo: Long, - val lockEpochSecond: Long + val lockEpochSecond: Long, + val lockDurationSecond: Long ) : Writeable, ToXContentFragment { override fun writeTo(out: StreamOutput) { @@ -30,6 +31,7 @@ data class ShrinkActionProperties( out.writeLong(lockPrimaryTerm) out.writeLong(lockSeqNo) out.writeLong(lockEpochSecond) + out.writeLong(lockDurationSecond) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -39,6 +41,7 @@ data class ShrinkActionProperties( builder.field(ShrinkProperties.LOCK_SEQ_NO.key, lockSeqNo) builder.field(ShrinkProperties.LOCK_PRIMARY_TERM.key, lockPrimaryTerm) builder.field(ShrinkProperties.LOCK_EPOCH_SECOND.key, lockEpochSecond) + builder.field(ShrinkProperties.LOCK_DURATION_SECOND.key, lockDurationSecond) return builder } @@ -52,8 +55,9 @@ data class ShrinkActionProperties( val lockPrimaryTerm: Long = si.readLong() val lockSeqNo: Long = si.readLong() val lockEpochSecond: Long = si.readLong() + val lockDurationSecond: Long = si.readLong() - return ShrinkActionProperties(nodeName, targetIndexName, targetNumShards, lockPrimaryTerm, lockSeqNo, lockEpochSecond) + return ShrinkActionProperties(nodeName, targetIndexName, targetNumShards, lockPrimaryTerm, lockSeqNo, lockEpochSecond, lockDurationSecond) } fun parse(xcp: XContentParser): ShrinkActionProperties { @@ -63,6 +67,7 @@ data class ShrinkActionProperties( var lockPrimaryTerm: Long? = null var lockSeqNo: Long? = null var lockEpochSecond: Long? = null + var lockDurationSecond: Long? = null XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -76,6 +81,7 @@ data class ShrinkActionProperties( ShrinkProperties.LOCK_PRIMARY_TERM.key -> lockPrimaryTerm = xcp.longValue() ShrinkProperties.LOCK_SEQ_NO.key -> lockSeqNo = xcp.longValue() ShrinkProperties.LOCK_EPOCH_SECOND.key -> lockEpochSecond = xcp.longValue() + ShrinkProperties.LOCK_DURATION_SECOND.key -> lockDurationSecond = xcp.longValue() } } @@ -85,7 +91,8 @@ data class ShrinkActionProperties( requireNotNull(targetNumShards), requireNotNull(lockPrimaryTerm), requireNotNull(lockSeqNo), - requireNotNull(lockEpochSecond) + requireNotNull(lockEpochSecond), + requireNotNull(lockDurationSecond) ) } } @@ -96,6 +103,7 @@ data class ShrinkActionProperties( TARGET_NUM_SHARDS("target_num_shards"), LOCK_SEQ_NO("lock_seq_no"), LOCK_PRIMARY_TERM("lock_primary_term"), - LOCK_EPOCH_SECOND("lock_epoch_second") + LOCK_EPOCH_SECOND("lock_epoch_second"), + LOCK_DURATION_SECOND("lock_duration_second") } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt index 4af08c345..cfde34f03 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkAction.kt @@ -40,6 +40,9 @@ class ShrinkAction( } else if (numNewShards != null) { require(numNewShards > 0) { "Shrink action numNewShards must be greater than 0." } } + if (targetIndexSuffix != null) { + require(!targetIndexSuffix.contains('*') && !targetIndexSuffix.contains('?')) { "Target index suffix must not contain wildcards." } + } } private val attemptMoveShardsStep = AttemptMoveShardsStep(this) @@ -71,6 +74,9 @@ class ShrinkAction( AttemptShrinkStep.name -> waitForShrinkStep else -> stepNameToStep[currentStep]!! } + } else if (currentStepStatus == Step.StepStatus.FAILED) { + // If we failed at any point, retries should start from the beginning + return attemptMoveShardsStep } // step not completed return stepNameToStep[currentStep]!! diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt index 9f6ba0ec7..059caa8b4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt @@ -6,6 +6,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.shrink import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchSecurityException import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse import org.opensearch.action.admin.cluster.reroute.ClusterRerouteRequest @@ -13,15 +14,21 @@ import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand import org.opensearch.cluster.service.ClusterService import org.opensearch.common.collect.Tuple +import org.opensearch.common.hash.MurmurHash3 +import org.opensearch.common.hash.MurmurHash3.Hash128 import org.opensearch.common.settings.Settings import org.opensearch.index.shard.DocsStats import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction -import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime -import org.opensearch.indexmanagement.indexstatemanagement.util.getFreeBytesThresholdHigh +import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.util.getIntervalFromManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.util.getManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink +import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockModel import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -34,11 +41,12 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaDat import org.opensearch.jobscheduler.repackage.com.cronutils.utils.VisibleForTesting import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.LockModel -import java.lang.Exception -import java.time.Duration -import java.time.Instant +import java.io.ByteArrayOutputStream +import java.io.ObjectOutputStream +import java.io.Serializable +import java.nio.ByteBuffer +import java.util.Base64 import java.util.PriorityQueue -import kotlin.collections.ArrayList import kotlin.math.ceil import kotlin.math.floor import kotlin.math.min @@ -56,14 +64,13 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { val context = this.context ?: return this val client = context.client val indexName = context.metadata.index - try { - if (actionTimedOut(context.metadata)) return this + try { val shrinkTargetIndexName = indexName + (action.targetIndexSuffix ?: DEFAULT_TARGET_SUFFIX) if (targetIndexNameExists(context.clusterService, shrinkTargetIndexName)) return this if (!isIndexGreen(client, indexName)) { - info = mapOf("message" to FAILURE_MESSAGE) + info = mapOf("message" to INDEX_NOT_GREEN_MESSAGE) stepStatus = StepStatus.CONDITION_NOT_MET return this } @@ -72,7 +79,6 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { // Fail if there is only one primary shard, as that cannot be shrunk val numOriginalShards = context.clusterService.state().metadata.indices[indexName].numberOfShards - // if (numOriginalShards == 1) { info = mapOf("message" to ONE_PRIMARY_SHARD_MESSAGE) stepStatus = StepStatus.COMPLETED @@ -87,8 +93,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { val statsStore = statsResponse.total.store val statsDocs = statsResponse.total.docs if (statsStore == null || statsDocs == null) { - info = mapOf("message" to FAILURE_MESSAGE) - stepStatus = StepStatus.FAILED + fail(FAILURE_MESSAGE) return this } val indexSize = statsStore.sizeInBytes @@ -98,8 +103,12 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { // get the nodes with enough memory in increasing order of free space val suitableNodes = findSuitableNodes(context, statsResponse, indexSize) + + // Get the job interval to use in determining the lock length + val interval = getJobIntervalSeconds(context.metadata.indexUuid, client) + // iterate through the nodes and try to acquire a lock on one - val lock = acquireLockOnNode(context.jobContext, suitableNodes) + val lock = acquireLockFromNodeList(context.jobContext, suitableNodes, interval) if (lock == null) { logger.info("$indexName could not find available node to shrink onto.") info = mapOf("message" to NO_AVAILABLE_NODES_MESSAGE) @@ -113,27 +122,83 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { numTargetShards, lock.primaryTerm, lock.seqNo, - lock.lockTime.epochSecond + lock.lockTime.epochSecond, + lock.lockDurationSeconds + ) + println(lock) + var lockToReacquire = getShrinkLockModel(shrinkActionProperties!!, context.jobContext) + println(lockToReacquire) + // lock ids are not the same! + println(lock.lockId == lockToReacquire.lockId) + println(lock.lockId) + + val out = ByteArrayOutputStream() + val os = ObjectOutputStream(out) + os.writeObject(lock.resource as Map) + val resourceAsBytes = out.toByteArray() + val hash = MurmurHash3.hash128( + resourceAsBytes, 0, resourceAsBytes.size, 0, + Hash128() + ) + val resourceHashBytes = ByteBuffer.allocate(16).putLong(hash.h1).putLong(hash.h2).array() + val resourceAsHashString = Base64.getUrlEncoder().withoutPadding().encodeToString(resourceHashBytes) + println(resourceAsHashString) + + val out2 = ByteArrayOutputStream() + val os2 = ObjectOutputStream(out2) + os2.writeObject(lockToReacquire.resource as Map) + val resourceAsBytes2 = out2.toByteArray() + val hash2 = MurmurHash3.hash128( + resourceAsBytes2, 0, resourceAsBytes2.size, 0, + Hash128() ) + val resourceHashBytes2 = ByteBuffer.allocate(16).putLong(hash2.h1).putLong(hash2.h2).array() + val resourceAsHashString2 = Base64.getUrlEncoder().withoutPadding().encodeToString(resourceHashBytes2) + println(resourceAsHashString2) + + println(lockToReacquire.lockId) + try { + lockToReacquire = context.jobContext.lockService.suspendUntil { renewLock(lockToReacquire, it) } + } catch (e: Exception) { + println(e) + } + println(lockToReacquire) + setToReadOnlyAndMoveIndexToNode(context, nodeName, lock) info = mapOf("message" to getSuccessMessage(nodeName)) stepStatus = StepStatus.COMPLETED return this + } catch (e: OpenSearchSecurityException) { + fail(getSecurityFailureMessage(e.localizedMessage), e.message) + return this } catch (e: Exception) { - info = mapOf("message" to FAILURE_MESSAGE, "cause" to "{${e.message}}") - stepStatus = StepStatus.FAILED + fail(FAILURE_MESSAGE, e.message) return this } } + private fun fail(message: String, cause: String? = null) { + info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause) + stepStatus = StepStatus.FAILED + } + + private suspend fun getJobIntervalSeconds(indexUuid: String, client: Client): Long? { + val managedIndexConfig: ManagedIndexConfig? + try { + managedIndexConfig = getManagedIndexConfig(indexUuid, client) + } catch (e: Exception) { + // If we fail to get the managedIndexConfig, just return null and a default lock duration of 12 hours will be used later + return null + } + // Divide the interval by 1000 to convert from ms to seconds + return managedIndexConfig?.let { getIntervalFromManagedIndexConfig(it) / 1000L } + } + private fun shouldFailTooManyDocuments(docsStats: DocsStats, numTargetShards: Int): Boolean { val totalDocs: Long = docsStats.count val docsPerTargetShard: Long = totalDocs / numTargetShards - // The maximum number of documents per shard is 2^31 - val maximumDocsPerShard = 0x80000000 - if (docsPerTargetShard > maximumDocsPerShard) { - info = mapOf("message" to TOO_MANY_DOCS_FAILURE_MESSAGE) - stepStatus = StepStatus.FAILED + if (docsPerTargetShard > MAXIMUM_DOCS_PER_SHARD) { + fail(TOO_MANY_DOCS_FAILURE_MESSAGE) return true } return false @@ -149,8 +214,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { val numReplicas = clusterService.state().metadata.indices[indexName].numberOfReplicas val shouldFailForceUnsafeCheck = numReplicas == 0 if (shouldFailForceUnsafeCheck) { - info = mapOf("message" to UNSAFE_FAILURE_MESSAGE) - stepStatus = StepStatus.FAILED + fail(UNSAFE_FAILURE_MESSAGE) return true } return false @@ -159,8 +223,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { private fun targetIndexNameExists(clusterService: ClusterService, shrinkTargetIndexName: String): Boolean { val indexExists = clusterService.state().metadata.indices.containsKey(shrinkTargetIndexName) if (indexExists) { - info = mapOf("message" to getIndexExistsMessage(shrinkTargetIndexName)) - stepStatus = StepStatus.FAILED + fail(getIndexExistsMessage(shrinkTargetIndexName)) return true } return false @@ -175,8 +238,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { try { val response: AcknowledgedResponse = issueUpdateSettingsRequest(stepContext.client, stepContext.metadata.index, updateSettings) if (!response.isAcknowledged) { - stepStatus = StepStatus.FAILED - info = mapOf("message" to UPDATE_FAILED_MESSAGE) + fail(UPDATE_FAILED_MESSAGE) jobContext.lockService.suspendUntil { release(lock, it) } } } catch (e: Exception) { @@ -190,11 +252,13 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { * Iterates through each suitable node in order, attempting to acquire a resource lock. Returns the first lock which * is successfully acquired. */ - private suspend fun acquireLockOnNode(jobContext: JobExecutionContext, suitableNodes: List): LockModel? { + private suspend fun acquireLockFromNodeList(jobContext: JobExecutionContext, suitableNodes: List, jobIntervalSeconds: Long?): LockModel? { for (node in suitableNodes) { val nodeResourceObject = mapOf(RESOURCE_NAME to node) - // TODO CLAY, the lock should be the timeout for all steps, not just one?? - val lockTime = action.configTimeout?.timeout?.seconds ?: MOVE_SHARDS_TIMEOUT_IN_SECONDS + // If we couldn't get the job interval for the lock, use the default of 12 hours. + // Lock is 3x + 30 minutes the job interval to allow the next step's execution to extend the lock without losing it. + // If user sets maximum jitter, it could be 2x the job interval before the next step is executed. + val lockTime = jobIntervalSeconds?.let { (it * 3) + (30 * 60) } ?: DEFAULT_LOCK_INTERVAL val lock: LockModel? = jobContext.lockService.suspendUntil { acquireLockOnResource(jobContext, lockTime, RESOURCE_TYPE, nodeResourceObject, it) } @@ -222,35 +286,31 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { val comparator = kotlin.Comparator { o1: Tuple, o2: Tuple -> o1.v1().compareTo(o2.v1()) } val nodesWithSpace = PriorityQueue(comparator) for (node in nodesList) { - val osStats = node.os - if (osStats != null) { - val memLeftInNode = osStats.mem.free.bytes - val totalNodeMem = osStats.mem.total.bytes - val freeBytesThresholdHigh = getFreeBytesThresholdHigh(stepContext.settings, stepContext.clusterService.clusterSettings, totalNodeMem) - // We require that a node has enough space to be below the high watermark disk level with an additional 2 * the index size free - val requiredBytes = (2 * indexSizeInBytes) + freeBytesThresholdHigh - if (memLeftInNode > requiredBytes) { - val memLeftAfterTransfer: Long = memLeftInNode - requiredBytes - nodesWithSpace.add(Tuple(memLeftAfterTransfer, node.node.name)) - } + val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.settings, stepContext.clusterService.clusterSettings) + if (remainingMem > 0L) { + nodesWithSpace.add(Tuple(remainingMem, node.node.name)) } } val suitableNodes: ArrayList = ArrayList() // For each node, do a dry run of moving all shards to the node to make sure there is enough space. // This should be rejected if allocation puts it above the low disk watermark setting for (sizeNodeTuple in nodesWithSpace) { - val nodeName = sizeNodeTuple.v2() + val targetNodeName = sizeNodeTuple.v2() val indexName = stepContext.metadata.index val clusterRerouteRequest = ClusterRerouteRequest().explain(true).dryRun(true) + var numberOfRerouteRequests = 0 for (shard in indicesStatsResponse.shards) { val shardId = shard.shardRouting.shardId() val currentShardNode = stepContext.clusterService.state().nodes[shard.shardRouting.currentNodeId()] - clusterRerouteRequest.add(MoveAllocationCommand(indexName, shardId.id, currentShardNode.name, nodeName)) + // Don't attempt a dry run for shards which are already on that node + if (currentShardNode.name == targetNodeName) continue + clusterRerouteRequest.add(MoveAllocationCommand(indexName, shardId.id, currentShardNode.name, targetNodeName)) + numberOfRerouteRequests++ } val clusterRerouteResponse: ClusterRerouteResponse = stepContext.client.admin().cluster().suspendUntil { reroute(clusterRerouteRequest, it) } // Should be the same number of yes decisions as the number of primary shards - if (clusterRerouteResponse.explanations.yesDecisionMessages.size == indicesStatsResponse.shards.size) { + if (clusterRerouteResponse.explanations.yesDecisionMessages.size == numberOfRerouteRequests) { suitableNodes.add(sizeNodeTuple.v2()) } } @@ -323,18 +383,6 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { info = mutableInfo.toMap() } - private fun actionTimedOut(managedIndexMetadata: ManagedIndexMetaData): Boolean { - val timeFromActionStarted: Duration = Duration.between(getActionStartTime(managedIndexMetadata), Instant.now()) - val timeOutInSeconds = action.configTimeout?.timeout?.seconds ?: MOVE_SHARDS_TIMEOUT_IN_SECONDS - // Get ActionTimeout if given, otherwise use default timeout of 12 hours - if (timeFromActionStarted.toSeconds() > timeOutInSeconds) { - info = mapOf("message" to TIMEOUT_MESSAGE) - stepStatus = StepStatus.FAILED - return true - } - return false - } - override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { val currentActionMetaData = currentMetadata.actionMetaData // If we succeeded because there was only one source primary shard, we no-op by skipping to the last step @@ -362,18 +410,20 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { const val ROUTING_SETTING = "index.routing.allocation.require._name" const val RESOURCE_NAME = "node_name" const val DEFAULT_TARGET_SUFFIX = "_shrunken" - const val MOVE_SHARDS_TIMEOUT_IN_SECONDS = 43200L // 12hrs in seconds const val name = "attempt_move_shards_step" const val RESOURCE_TYPE = "shrink" - const val TIMEOUT_MESSAGE = "Timed out waiting for finding node." const val UPDATE_FAILED_MESSAGE = "Shrink failed because settings could not be updated.." const val NO_AVAILABLE_NODES_MESSAGE = "There are no available nodes for to move to to execute a shrink. Delaying until node becomes available." + const val DEFAULT_LOCK_INTERVAL = 3L * 60L * 60L // Default lock interval is 3 hours in seconds const val UNSAFE_FAILURE_MESSAGE = "Shrink failed because index has no replicas and force_unsafe is not set to true." const val ONE_PRIMARY_SHARD_MESSAGE = "Shrink action did not do anything because source index only has one primary shard." const val TOO_MANY_DOCS_FAILURE_MESSAGE = "Shrink failed because there would be too many documents on each target shard following the shrink." + const val INDEX_NOT_GREEN_MESSAGE = "Shrink action cannot start moving shards as the index is not green." const val FAILURE_MESSAGE = "Shrink failed to start moving shards." + private const val MAXIMUM_DOCS_PER_SHARD = 0x80000000 // The maximum number of documents per shard is 2^31 fun getSuccessMessage(node: String) = "Successfully started moving the shards to $node." fun getIndexExistsMessage(newIndex: String) = "Shrink failed because $newIndex already exists." + fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt index ca8962b07..23e14115e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt @@ -6,13 +6,20 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.shrink import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse import org.opensearch.action.admin.indices.shrink.ResizeRequest import org.opensearch.action.admin.indices.shrink.ResizeResponse +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS +import org.opensearch.indexmanagement.indexstatemanagement.util.clearReadOnlyAndRouting +import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen import org.opensearch.indexmanagement.indexstatemanagement.util.releaseShrinkLock +import org.opensearch.indexmanagement.indexstatemanagement.util.renewShrinkLock import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -34,8 +41,12 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) { val actionMetadata = context.metadata.actionMetaData val shrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties if (shrinkActionProperties == null) { - info = mapOf("message" to "Shrink action properties are null, metadata was not properly populated") - stepStatus = StepStatus.FAILED + cleanupAndFail(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE) + return this + } + val lock = renewShrinkLock(shrinkActionProperties, context.jobContext, logger) + if (lock == null) { + cleanupAndFail("Failed to renew lock on node [${shrinkActionProperties.nodeName}]") return this } try { @@ -44,24 +55,69 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) { info = mapOf("message" to INDEX_HEALTH_NOT_GREEN_MESSAGE) return this } + if (!isNodeStillSuitable(shrinkActionProperties.nodeName, indexName, context)) return this + // If the resize index api fails, the step will be set to failed and resizeIndex will return false if (!resizeIndex(indexName, shrinkActionProperties, context)) return this info = mapOf("message" to getSuccessMessage(shrinkActionProperties.targetIndexName)) stepStatus = StepStatus.COMPLETED return this } catch (e: RemoteTransportException) { - info = mapOf("message" to FAILURE_MESSAGE) - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - stepStatus = StepStatus.FAILED + cleanupAndFail(FAILURE_MESSAGE) return this } catch (e: Exception) { - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - info = mapOf("message" to FAILURE_MESSAGE, "cause" to "{${e.message}}") - stepStatus = StepStatus.FAILED + cleanupAndFail(FAILURE_MESSAGE, e.message) return this } } + // Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock + private suspend fun cleanupAndFail(message: String, cause: String? = null) { + info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause) + stepStatus = StepStatus.FAILED + val context = this.context ?: return + try { + clearReadOnlyAndRouting(context.metadata.index, context.client) + } catch (e: Exception) { + logger.error("Shrink action failed while trying to clean up routing and readonly setting after a failure: $e") + } + try { + val shrinkActionProperties = context.metadata.actionMetaData?.actionProperties?.shrinkActionProperties ?: return + releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) + } catch (e: Exception) { + logger.error("Shrink action failed while trying to release the node lock after a failure: $e") + } + } + + private suspend fun isNodeStillSuitable(nodeName: String, indexName: String, context: StepContext): Boolean { + // Get the size of the index + val statsRequest = IndicesStatsRequest().indices(indexName) + val statsResponse: IndicesStatsResponse = context.client.admin().indices().suspendUntil { + stats(statsRequest, it) + } + val statsStore = statsResponse.total.store + if (statsStore == null) { + cleanupAndFail(FAILURE_MESSAGE) + return false + } + val indexSizeInBytes = statsStore.sizeInBytes + // Get the remaining memory in the node + val nodesStatsReq = NodesStatsRequest().addMetric(AttemptMoveShardsStep.OS_METRIC) + val nodeStatsResponse: NodesStatsResponse = context.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) } + // If the node has been replaced, this will fail + val node = nodeStatsResponse.nodes.firstOrNull { it.node.name == nodeName } + if (node == null) { + cleanupAndFail(FAILURE_MESSAGE) + return false + } + val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.settings, context.clusterService.clusterSettings) + if (remainingMem < 1L) { + cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE) + return false + } + return true + } + private suspend fun resizeIndex(sourceIndex: String, shrinkActionProperties: ShrinkActionProperties, context: StepContext): Boolean { val targetIndex = shrinkActionProperties.targetIndexName val req = ResizeRequest(targetIndex, sourceIndex) @@ -74,9 +130,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) { action.aliases?.forEach { req.targetIndexRequest.alias(it) } val resizeResponse: ResizeResponse = context.client.admin().indices().suspendUntil { resizeIndex(req, it) } if (!resizeResponse.isAcknowledged) { - info = mapOf("message" to FAILURE_MESSAGE) - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - stepStatus = StepStatus.FAILED + cleanupAndFail(FAILURE_MESSAGE) return false } return true @@ -97,6 +151,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) { companion object { const val name = "attempt_shrink_step" const val FAILURE_MESSAGE = "Shrink failed when sending shrink request." + const val NOT_ENOUGH_SPACE_FAILURE_MESSAGE = "Shrink failed as the selected node no longer had enough free space to shrink to." const val INDEX_HEALTH_NOT_GREEN_MESSAGE = "Shrink delayed because index health is not green." fun getSuccessMessage(newIndex: String) = "Shrink started. $newIndex currently being populated." } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt index 02e4c8d0f..8c8f56449 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt @@ -9,14 +9,15 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.admin.indices.stats.ShardStats -import org.opensearch.common.collect.ImmutableOpenIntMap import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction +import org.opensearch.indexmanagement.indexstatemanagement.util.clearReadOnlyAndRouting import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime +import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockModel import org.opensearch.indexmanagement.indexstatemanagement.util.releaseShrinkLock +import org.opensearch.indexmanagement.indexstatemanagement.util.renewShrinkLock import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.transport.RemoteTransportException @@ -36,10 +37,16 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { val actionMetadata = context.metadata.actionMetaData val shrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties if (shrinkActionProperties == null) { - info = mapOf("message" to "Shrink action properties are null, metadata was not properly populated") - stepStatus = StepStatus.FAILED + cleanupAndFail(METADATA_FAILURE_MESSAGE) return this } + println(getShrinkLockModel(shrinkActionProperties, context.jobContext)) + val lock = renewShrinkLock(shrinkActionProperties, context.jobContext, logger) + if (lock == null) { + cleanupAndFail("Failed to renew lock on node [${shrinkActionProperties.nodeName}]") + return this + } + println(lock) try { val indexStatsRequests: IndicesStatsRequest = IndicesStatsRequest().indices(indexName) val response: IndicesStatsResponse = context.client.admin().indices().suspendUntil { stats(indexStatsRequests, it) } @@ -57,7 +64,9 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { if (nodeNameShardIsOn.equals(nodeToMoveOnto) && routingInfo.started()) { numShardsOnNode++ } - if (numReplicas == 0 || inSyncReplicaExists(routingInfo.id, inSyncAllocations)) { + // Either there must be no replicas (force unsafe must have been set) or all replicas must be in sync as + // it isn't known which shard (any replica or primary) will be moved to the target node and used in the shrink. + if (numReplicas == 0 || inSyncAllocations[routingInfo.id].size == (numReplicas + 1)) { numShardsInSync++ } } @@ -68,23 +77,35 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { } else { val numShardsNotOnNode = numPrimaryShards - numShardsOnNode val numShardsNotInSync = numPrimaryShards - numShardsInSync - checkTimeOut(context, shrinkActionProperties, numShardsNotOnNode, numShardsNotInSync, nodeToMoveOnto) + checkTimeOut(context, numShardsNotOnNode, numShardsNotInSync, nodeToMoveOnto) } return this } catch (e: RemoteTransportException) { - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - info = mapOf("message" to FAILURE_MESSAGE) - stepStatus = StepStatus.FAILED + cleanupAndFail(FAILURE_MESSAGE) return this } catch (e: Exception) { - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - info = mapOf("message" to FAILURE_MESSAGE, "cause" to "{${e.message}}") - stepStatus = StepStatus.FAILED + cleanupAndFail(FAILURE_MESSAGE, cause = e.message) return this } } - private fun inSyncReplicaExists(shardId: Int, inSyncAllocations: ImmutableOpenIntMap>): Boolean = inSyncAllocations[shardId].size > 1 + // Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock + private suspend fun cleanupAndFail(message: String, cause: String? = null) { + info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause) + stepStatus = StepStatus.FAILED + val context = this.context ?: return + try { + clearReadOnlyAndRouting(context.metadata.index, context.client) + } catch (e: Exception) { + logger.error("Shrink action failed while trying to clean up routing and readonly setting after a failure: $e") + } + try { + val shrinkActionProperties = context.metadata.actionMetaData?.actionProperties?.shrinkActionProperties ?: return + releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) + } catch (e: Exception) { + logger.error("Shrink action failed while trying to release the node lock after a failure: $e") + } + } override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { // Saving maxNumSegments in ActionProperties after the force merge operation has begun so that if a ChangePolicy occurred @@ -100,7 +121,6 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { private suspend fun checkTimeOut( stepContext: StepContext, - shrinkActionProperties: ShrinkActionProperties, numShardsNotOnNode: Int, numShardsNotInSync: Int, nodeToMoveOnto: String @@ -110,23 +130,19 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { val timeSinceActionStarted: Duration = Duration.between(getActionStartTime(managedIndexMetadata), Instant.now()) val timeOutInSeconds = action.configTimeout?.timeout?.seconds ?: MOVE_SHARDS_TIMEOUT_IN_SECONDS // Get ActionTimeout if given, otherwise use default timeout of 12 hours - stepStatus = if (timeSinceActionStarted.toSeconds() > timeOutInSeconds) { + if (timeSinceActionStarted.toSeconds() > timeOutInSeconds) { logger.error( "Shrink Action move shards failed on [$indexName], the action timed out with [$numShardsNotOnNode] shards not yet " + "moved and [$numShardsNotInSync] shards without an in sync replica." ) - if (managedIndexMetadata.actionMetaData?.actionProperties?.shrinkActionProperties != null) { - releaseShrinkLock(shrinkActionProperties, stepContext.jobContext, logger) - } - info = mapOf("message" to getTimeoutFailure(nodeToMoveOnto)) - StepStatus.FAILED + cleanupAndFail(getTimeoutFailure(nodeToMoveOnto)) } else { logger.debug( "Shrink action move shards step running on [$indexName], [$numShardsNotOnNode] shards need to be moved, " + "[$numShardsNotInSync] shards need an in sync replica." ) info = mapOf("message" to getTimeoutDelay(nodeToMoveOnto)) - StepStatus.CONDITION_NOT_MET + stepStatus = StepStatus.CONDITION_NOT_MET } } @@ -138,6 +154,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { fun getTimeoutFailure(node: String) = "Shrink failed because it took to long to move shards to $node" fun getTimeoutDelay(node: String) = "Shrink delayed because it took to long to move shards to $node" const val FAILURE_MESSAGE = "Shrink failed when waiting for shards to move." + const val METADATA_FAILURE_MESSAGE = "Shrink action properties are null, metadata was not properly populated" const val MOVE_SHARDS_TIMEOUT_IN_SECONDS = 43200L // 12hrs in seconds const val RESOURCE_NAME = "node_name" const val RESOURCE_TYPE = "shrink" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt index 554c76be2..1e3246e95 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt @@ -6,19 +6,21 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.shrink import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction +import org.opensearch.indexmanagement.indexstatemanagement.util.clearReadOnlyAndRouting import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest import org.opensearch.indexmanagement.indexstatemanagement.util.releaseShrinkLock +import org.opensearch.indexmanagement.indexstatemanagement.util.renewShrinkLock import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.transport.RemoteTransportException @@ -36,8 +38,12 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) { val actionMetadata = context.metadata.actionMetaData val shrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties if (shrinkActionProperties == null) { - info = mapOf("message" to "Shrink action properties are null, metadata was not properly populated") - stepStatus = StepStatus.FAILED + cleanupAndFail(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE) + return this + } + val lock = renewShrinkLock(shrinkActionProperties, context.jobContext, logger) + if (lock == null) { + cleanupAndFail("Failed to renew lock on node [${shrinkActionProperties.nodeName}]") return this } try { @@ -45,38 +51,62 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) { val numPrimaryShardsStarted = getNumPrimaryShardsStarted(context.client, targetIndex) val numPrimaryShards = context.clusterService.state().metadata.indices[targetIndex].numberOfShards if (numPrimaryShards != shrinkActionProperties.targetNumShards || numPrimaryShardsStarted != shrinkActionProperties.targetNumShards) { - checkTimeOut(context, shrinkActionProperties, targetIndex) + checkTimeOut(context, targetIndex) return this } // Clear source and target allocation, if either fails the step will be set to failed and the function will return false - if (!clearAllocationSettings(context, targetIndex, shrinkActionProperties)) return this - if (!clearAllocationSettings(context, context.metadata.index, shrinkActionProperties)) return this + if (!clearAllocationSettings(context, targetIndex)) return this + if (!clearAllocationSettings(context, context.metadata.index)) return this releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) stepStatus = StepStatus.COMPLETED info = mapOf("message" to SUCCESS_MESSAGE) return this } catch (e: RemoteTransportException) { - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - info = mapOf("message" to getFailureMessage(shrinkActionProperties.targetIndexName)) - stepStatus = StepStatus.FAILED + cleanupAndFail(getFailureMessage(shrinkActionProperties.targetIndexName)) return this } catch (e: Exception) { - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - info = mapOf("message" to GENERIC_FAILURE_MESSAGE, "cause" to "{${e.message}}") - stepStatus = StepStatus.FAILED + cleanupAndFail(GENERIC_FAILURE_MESSAGE, e.message) return this } } - private suspend fun clearAllocationSettings(context: StepContext, index: String, shrinkActionProperties: ShrinkActionProperties): Boolean { + // Sets the action to failed, clears the readonly and allocation settings on the source index, deletes the target index, and releases the shrink lock + private suspend fun cleanupAndFail(message: String, cause: String? = null) { + info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause) + stepStatus = StepStatus.FAILED + val context = this.context ?: return + // Using a try/catch for each cleanup action as we should clean up as much as possible despite any failures + try { + clearReadOnlyAndRouting(context.metadata.index, context.client) + } catch (e: Exception) { + logger.error("Shrink action failed while trying to clean up routing and readonly setting after a failure: $e") + } + val shrinkActionProperties = context.metadata.actionMetaData?.actionProperties?.shrinkActionProperties ?: return + try { + // TODO CLAY use plugin permissions when cleaning up + // Delete the target index + val deleteRequest = DeleteIndexRequest(shrinkActionProperties.targetIndexName) + val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { delete(deleteRequest, it) } + if (!response.isAcknowledged) { + logger.error("Shrink action failed to delete target index during cleanup after a failure") + } + } catch (e: Exception) { + logger.error("Shrink action failed while trying to delete the target index after a failure: $e") + } + try { + releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) + } catch (e: Exception) { + logger.error("Shrink action failed while trying to release the node lock after a failure: $e") + } + } + + private suspend fun clearAllocationSettings(context: StepContext, index: String): Boolean { val allocationSettings = Settings.builder().putNull(AttemptMoveShardsStep.ROUTING_SETTING).build() val response: AcknowledgedResponse = issueUpdateSettingsRequest(context.client, index, allocationSettings) if (!response.isAcknowledged) { - releaseShrinkLock(shrinkActionProperties, context.jobContext, logger) - stepStatus = StepStatus.FAILED - info = mapOf("message" to getFailureMessage(index)) + cleanupAndFail(getFailureMessage(index)) return false } return true @@ -88,25 +118,16 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) { return targetStatsResponse.shards.filter { it.shardRouting.started() && it.shardRouting.primary() }.size } - private suspend fun checkTimeOut(stepContext: StepContext, shrinkActionProperties: ShrinkActionProperties, targetIndex: String) { + private suspend fun checkTimeOut(stepContext: StepContext, targetIndex: String) { val managedIndexMetadata = stepContext.metadata - val indexName = managedIndexMetadata.index val timeFromActionStarted: Duration = Duration.between(getActionStartTime(managedIndexMetadata), Instant.now()) val timeOutInSeconds = action.configTimeout?.timeout?.seconds ?: WaitForMoveShardsStep.MOVE_SHARDS_TIMEOUT_IN_SECONDS // Get ActionTimeout if given, otherwise use default timeout of 12 hours - stepStatus = if (timeFromActionStarted.toSeconds() > timeOutInSeconds) { - logger.error( - "Shards of $indexName have still not started." - ) - releaseShrinkLock(shrinkActionProperties, stepContext.jobContext, logger) - info = mapOf("message" to getFailureMessage(targetIndex)) - StepStatus.FAILED + if (timeFromActionStarted.toSeconds() > timeOutInSeconds) { + cleanupAndFail(getFailureMessage(targetIndex)) } else { - logger.debug( - "Shards of $indexName have still not started." - ) info = mapOf("message" to getDelayedMessage(targetIndex)) - StepStatus.CONDITION_NOT_MET + stepStatus = StepStatus.CONDITION_NOT_MET } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index b9c9dc7ee..66254ffee 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -7,19 +7,28 @@ @file:JvmName("ManagedIndexUtils") package org.opensearch.indexmanagement.indexstatemanagement.util -// import inet.ipaddr.IPAddressString -// import org.apache.logging.log4j.LogManager +//import inet.ipaddr.IPAddressString +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +//import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest import org.opensearch.action.update.UpdateRequest -// import org.opensearch.alerting.destination.message.BaseMessage +//import org.opensearch.alerting.destination.message.BaseMessage +import org.opensearch.client.Client import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX @@ -37,6 +46,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.Swe import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.opensearchapi.optionalISMTemplateField import org.opensearch.indexmanagement.opensearchapi.optionalTimeField +import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData @@ -535,3 +546,25 @@ enum class MetadataCheck { // } // return false // } + +@Suppress("BlockingMethodInNonBlockingContext") +suspend fun getManagedIndexConfig(indexUuid: String, client: Client): ManagedIndexConfig? { + val request = GetRequest().routing(indexUuid).index(INDEX_MANAGEMENT_INDEX).id(indexUuid) + val response: GetResponse = client.suspendUntil { get(request, it) } + var managedIndexConfig: ManagedIndexConfig? = null + val configSource = response.sourceAsBytesRef + // Intellij complains about createParser/parseWithType blocking because it sees they throw IOExceptions + configSource?.let { + withContext(Dispatchers.IO) { + val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, configSource, XContentType.JSON) + managedIndexConfig = xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, ManagedIndexConfig.Companion::parse) + } + } + return managedIndexConfig +} + +// extracts the job scheduler interval from the managed index config and returns the millisecond value +fun getIntervalFromManagedIndexConfig(managedIndexConfig: ManagedIndexConfig): Long { + val periodTuple = managedIndexConfig.jobSchedule.getPeriodStartingAt(Instant.now()) + return periodTuple.v2().toEpochMilli() - periodTuple.v1().toEpochMilli() +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt index 6b9609add..96ac2bf8c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt @@ -8,18 +8,22 @@ package org.opensearch.indexmanagement.indexstatemanagement.util import org.apache.logging.log4j.Logger import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.cluster.health.ClusterHealthResponse +import org.opensearch.action.admin.cluster.node.stats.NodeStats import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.allocation.DiskThresholdSettings import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptMoveShardsStep import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForMoveShardsStep import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.LockModel +import java.lang.Exception import java.time.Instant suspend fun issueUpdateSettingsRequest(client: Client, indexName: String, settings: Settings): AcknowledgedResponse { @@ -36,7 +40,33 @@ suspend fun releaseShrinkLock( val lock: LockModel = getShrinkLockModel(shrinkActionProperties, jobExecutionContext) val released: Boolean = jobExecutionContext.lockService.suspendUntil { release(lock, it) } if (!released) { - logger.warn("Lock not released on failure") + logger.error("Failed to release Shrink action lock on node [${shrinkActionProperties.nodeName}]") + } +} + +suspend fun releaseShrinkLock( + lock: LockModel, + jobExecutionContext: JobExecutionContext, + logger: Logger +) { + val released: Boolean = jobExecutionContext.lockService.suspendUntil { release(lock, it) } + if (!released) { + logger.error("Failed to release Shrink action lock on node [${lock.resource[AttemptMoveShardsStep.RESOURCE_NAME] as String}]") + } +} + +suspend fun renewShrinkLock( + shrinkActionProperties: ShrinkActionProperties, + jobExecutionContext: JobExecutionContext, + logger: Logger +): LockModel? { + val lock: LockModel = getShrinkLockModel(shrinkActionProperties, jobExecutionContext) + println(lock.lockDurationSeconds) + return try { + jobExecutionContext.lockService.suspendUntil { renewLock(lock, it) } + } catch (e: Exception) { + logger.error("Failed to renew Shrink action lock on node [${shrinkActionProperties.nodeName}]: $e") + null } } @@ -50,7 +80,8 @@ fun getShrinkLockModel( jobExecutionContext.jobId, shrinkActionProperties.lockEpochSecond, shrinkActionProperties.lockPrimaryTerm, - shrinkActionProperties.lockSeqNo + shrinkActionProperties.lockSeqNo, + shrinkActionProperties.lockDurationSecond ) } @@ -61,7 +92,8 @@ fun getShrinkLockModel( jobId: String, lockEpochSecond: Long, lockPrimaryTerm: Long, - lockSeqNo: Long + lockSeqNo: Long, + lockDurationSecond: Long ): LockModel { val resource: HashMap = HashMap() resource[WaitForMoveShardsStep.RESOURCE_NAME] = nodeName @@ -72,7 +104,7 @@ fun getShrinkLockModel( WaitForMoveShardsStep.RESOURCE_TYPE, resource as Map?, lockCreationInstant, - WaitForMoveShardsStep.MOVE_SHARDS_TIMEOUT_IN_SECONDS, + lockDurationSecond, false, lockSeqNo, lockPrimaryTerm @@ -103,6 +135,25 @@ fun getFreeBytesThresholdHigh(settings: Settings, clusterSettings: ClusterSettin } else diskThresholdBytes.bytes } +/* + * Returns the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, or -1 + * if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats. +*/ +fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, settings: Settings, clusterSettings: ClusterSettings?): Long { + val osStats = node.os + if (osStats != null) { + val memLeftInNode = osStats.mem.free.bytes + val totalNodeMem = osStats.mem.total.bytes + val freeBytesThresholdHigh = getFreeBytesThresholdHigh(settings, clusterSettings, totalNodeMem) + // We require that a node has enough space to be below the high watermark disk level with an additional 2 * the index size free + val requiredBytes = (2 * indexSizeInBytes) + freeBytesThresholdHigh + if (memLeftInNode > requiredBytes) { + return memLeftInNode - requiredBytes + } + } + return -1L +} + suspend fun isIndexGreen(client: Client, indexName: String): Boolean { // get index health, waiting for a green status val healthReq = ClusterHealthRequest().indices(indexName).waitForGreenStatus() @@ -110,3 +161,12 @@ suspend fun isIndexGreen(client: Client, indexName: String): Boolean { // The request was set to wait for green index, if the request timed out, the index never was green return !response.isTimedOut } + +suspend fun clearReadOnlyAndRouting(index: String, client: Client): Boolean { + val allocationSettings = Settings.builder().putNull(AttemptMoveShardsStep.ROUTING_SETTING).putNull(IndexMetadata.SETTING_BLOCKS_WRITE).build() + val response: AcknowledgedResponse = issueUpdateSettingsRequest(client, index, allocationSettings) + if (!response.isAcknowledged) { + return false + } + return true +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt index 084609ada..b3e4604cf 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt @@ -444,4 +444,71 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { ) } } + + fun `test retries from first step`() { + val testPolicy = """ + {"policy":{"description":"Default policy","default_state":"Shrink","states":[ + {"name":"Shrink","actions":[{"retry":{"count":2,"backoff":"constant","delay":"1s"},"shrink": + {"num_new_shards":1, "target_index_suffix":"_shrink_test", "force_unsafe": "true"}}],"transitions":[]}]}} + """.trimIndent() + val logger = LogManager.getLogger(::ShrinkActionIT) + val indexName = "${testIndexName}_retry" + val policyID = "${testIndexName}_testPolicyName_retry" + createPolicyJson(testPolicy, policyID) + + createIndex(indexName, policyID, null, "0", "3", "") + insertSampleData(indexName, 3) + + // Will change the startTime each execution so that it triggers in 2 seconds + // First execution: Policy is initialized + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + logger.info("before attempt move shards") + // Starts AttemptMoveShardsStep + updateManagedIndexConfigStartTime(managedIndexConfig) + + val targetIndexName = indexName + "_shrink_test" + waitFor { + assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName) + assertEquals("true", getIndexBlocksWriteSetting(indexName)) + assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName) + val settings = getFlatSettings(indexName) + val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + assertTrue(settings.containsKey("index.routing.allocation.require._name")) + assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"]) + assertEquals( + AttemptMoveShardsStep.getSuccessMessage(nodeToShrink), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + // starts WaitForMoveShardsStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + WaitForMoveShardsStep.getSuccessMessage(nodeToShrink), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + // Create an index with the target index name so the AttemptShrinkStep fails + createIndex(targetIndexName, null) + + // Wait for move should finish before this. Starts AttemptShrinkStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(50)) { + val stepMetadata = getExplainManagedIndexMetaData(indexName).stepMetaData + assertEquals("Did not fail due to target index existing step as expected", Step.StepStatus.FAILED, stepMetadata?.stepStatus) + assertEquals(AttemptShrinkStep.name, stepMetadata?.name) + } + // TODO add checks for successful cleanup + + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val stepMetadata = getExplainManagedIndexMetaData(indexName).stepMetaData + assertEquals("Shrink action should have started over after failing", stepMetadata?.name, AttemptMoveShardsStep.name) + assertEquals("Step status should have been starting", Step.StepStatus.STARTING, stepMetadata?.stepStatus) + } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt index f7829f525..7a71b3481 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt @@ -183,10 +183,6 @@ class ActionTests : OpenSearchTestCase() { assertEquals("Free bytes threshold not being calculated correctly for byte setting.", thresholdBytes, byteValue.bytes) } - fun `test for fun`() { - println(0x80000000) - } - private fun roundTripAction(expectedAction: Action) { val baos = ByteArrayOutputStream() val osso = OutputStreamStreamOutput(baos)