diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt index d1901f111..cf4dafe29 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt @@ -31,6 +31,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon import org.opensearch.indexmanagement.indexstatemanagement.util.getIntervalFromManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.util.getManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink +import org.opensearch.indexmanagement.indexstatemanagement.util.getShardIdToNodeNameSet import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockID import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest @@ -309,6 +310,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { nodesWithSpace.add(Tuple(remainingMem, node.node.name)) } } + val shardIdToNodeList: Map> = getShardIdToNodeNameSet(indicesStatsResponse, stepContext.clusterService.state().nodes) val suitableNodes: ArrayList = ArrayList() // For each node, do a dry run of moving all shards to the node to make sure that there aren't any other blockers // to the allocation. @@ -316,20 +318,20 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { val targetNodeName = sizeNodeTuple.v2() val indexName = stepContext.metadata.index val clusterRerouteRequest = ClusterRerouteRequest().explain(true).dryRun(true) - var numberOfRerouteRequests = 0 + val requestedShardIds: MutableSet = mutableSetOf() for (shard in indicesStatsResponse.shards) { val shardId = shard.shardRouting.shardId() val currentShardNode = stepContext.clusterService.state().nodes[shard.shardRouting.currentNodeId()] - // Don't attempt a dry run for shards which are already on that node - if (currentShardNode.name == targetNodeName) continue + // Don't attempt a dry run for shards which have a copy already on that node + if (shardIdToNodeList[shardId.id]?.contains(targetNodeName) == true || requestedShardIds.contains(shardId.id)) continue clusterRerouteRequest.add(MoveAllocationCommand(indexName, shardId.id, currentShardNode.name, targetNodeName)) - numberOfRerouteRequests++ + requestedShardIds.add(shardId.id) } val clusterRerouteResponse: ClusterRerouteResponse = stepContext.client.admin().cluster().suspendUntil { reroute(clusterRerouteRequest, it) } val numYesDecisions = clusterRerouteResponse.explanations.explanations().count { it.decisions().type().equals((Decision.Type.YES)) } // Should be the same number of yes decisions as the number of primary shards - if (numYesDecisions == numberOfRerouteRequests) { + if (numYesDecisions == requestedShardIds.size) { suitableNodes.add(sizeNodeTuple.v2()) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt index c4577cd08..f1dca81b1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt @@ -63,16 +63,16 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { val nodeToMoveOnto = localShrinkActionProperties.nodeName val inSyncAllocations = context.clusterService.state().metadata.indices[indexName].inSyncAllocationIds val numReplicas = context.clusterService.state().metadata.indices[indexName].numberOfReplicas - var numShardsOnNode = 0 + val shardIdOnNode: MutableMap = mutableMapOf() var numShardsInSync = 0 for (shard: ShardStats in response.shards) { val routingInfo = shard.shardRouting val nodeIdShardIsOn = routingInfo.currentNodeId() val nodeNameShardIsOn = context.clusterService.state().nodes()[nodeIdShardIsOn].name + if (nodeNameShardIsOn.equals(nodeToMoveOnto) && routingInfo.started()) { + shardIdOnNode[shard.shardRouting.id] = true + } if (routingInfo.primary()) { - if (nodeNameShardIsOn.equals(nodeToMoveOnto) && routingInfo.started()) { - numShardsOnNode++ - } // Either there must be no replicas (force unsafe must have been set) or all replicas must be in sync as // it isn't known which shard (any replica or primary) will be moved to the target node and used in the shrink. if (numReplicas == 0 || inSyncAllocations[routingInfo.id].size == (numReplicas + 1)) { @@ -80,11 +80,11 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { } } } - if (numShardsOnNode >= numPrimaryShards && numShardsInSync >= numPrimaryShards) { + if (shardIdOnNode.values.all { it } && numShardsInSync >= numPrimaryShards) { info = mapOf("message" to getSuccessMessage(nodeToMoveOnto)) stepStatus = StepStatus.COMPLETED } else { - val numShardsNotOnNode = numPrimaryShards - numShardsOnNode + val numShardsNotOnNode = shardIdOnNode.values.count { !it } val numShardsNotInSync = numPrimaryShards - numShardsInSync checkTimeOut(context, numShardsNotOnNode, numShardsNotInSync, nodeToMoveOnto) } @@ -166,8 +166,8 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) { companion object { const val name = "wait_for_move_shards_step" fun getSuccessMessage(node: String) = "The shards successfully moved to $node." - fun getTimeoutFailure(node: String) = "Shrink failed because it took to long to move shards to $node" - fun getTimeoutDelay(node: String) = "Shrink delayed because it took to long to move shards to $node" + fun getTimeoutFailure(node: String) = "Shrink failed because it took too long to move shards to $node" + fun getTimeoutDelay(node: String) = "Shrink delayed because it took too long to move shards to $node" const val FAILURE_MESSAGE = "Shrink failed when waiting for shards to move." const val METADATA_FAILURE_MESSAGE = "Shrink action properties are null, metadata was not properly populated" const val MOVE_SHARDS_TIMEOUT_IN_SECONDS = 43200L // 12hrs in seconds diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt index 996b3f2a8..a25cd47e3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt @@ -10,9 +10,11 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.cluster.node.stats.NodeStats import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.routing.allocation.DiskThresholdSettings import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings @@ -192,3 +194,20 @@ suspend fun resetReadOnlyAndRouting(index: String, client: Client, originalSetti fun getShrinkLockID(nodeName: String): String { return "$LOCK_RESOURCE_TYPE-$LOCK_RESOURCE_NAME-$nodeName" } + +// Creates a map of shardIds to the set of node names which the shard copies reside on. For example, with 2 replicas +// each shardId would have a set containing 3 node names, for the nodes of the primary and two replicas. +fun getShardIdToNodeNameSet(indicesStatsResponse: IndicesStatsResponse, nodes: DiscoveryNodes): Map> { + val shardIdToNodeList: MutableMap> = mutableMapOf() + for (shard in indicesStatsResponse.shards) { + val shardId = shard.shardRouting.shardId().id + // If nodeName is null, then the nodes could have changed since the indicesStatsResponse, just skip adding it + val nodeName: String = nodes[shard.shardRouting.currentNodeId()].name ?: continue + if (shardIdToNodeList.containsKey(shardId)) { + shardIdToNodeList[shardId]?.add(nodeName) + } else { + shardIdToNodeList[shardId] = mutableSetOf(nodeName) + } + } + return shardIdToNodeList +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt index 686a09265..77994efa1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt @@ -466,6 +466,101 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + fun `test shrink with replicas`() { + val logger = LogManager.getLogger(::ShrinkActionIT) + val nodes = getNodes() + if (nodes.size > 1) { + val indexName = "${testIndexName}_with_replicas" + val policyID = "${testIndexName}_with_replicas" + val shrinkAction = ShrinkAction( + numNewShards = null, + maxShardSize = null, + percentageOfSourceShards = 0.5, + targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), + aliases = null, + forceUnsafe = false, + index = 0 + ) + val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 11L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID, null, "1", "3", "") + insertSampleData(indexName, 3) + // Will change the startTime each execution so that it triggers in 2 seconds + // First execution: Policy is initialized + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + logger.info("index settings: \n ${getFlatSettings(indexName)}") + + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + // Starts AttemptMoveShardsStep + updateManagedIndexConfigStartTime(managedIndexConfig) + val targetIndexName = indexName + testIndexSuffix + waitFor(Instant.ofEpochSecond(60)) { + assertEquals( + targetIndexName, + getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName + ) + assertEquals("true", getIndexBlocksWriteSetting(indexName)) + val nodeName = + getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + assertNotNull("Couldn't find node to shrink onto.", nodeName) + val settings = getFlatSettings(indexName) + assertTrue(settings.containsKey("index.routing.allocation.require._name")) + assertEquals(nodeName, settings["index.routing.allocation.require._name"]) + assertEquals( + AttemptMoveShardsStep.getSuccessMessage(nodeName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + val nodeToShrink = + getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + + // starts WaitForMoveShardsStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(60)) { + assertEquals( + WaitForMoveShardsStep.getSuccessMessage(nodeToShrink), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + // Wait for move should finish before this. Starts AttemptShrinkStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(50)) { + assertTrue("Target index is not created", indexExists(targetIndexName)) + assertEquals( + AttemptShrinkStep.getSuccessMessage(targetIndexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + // starts WaitForShrinkStep + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor(Instant.ofEpochSecond(60)) { + // one primary and one replica + assertTrue(getIndexShards(targetIndexName).size == 2) + assertEquals( + WaitForShrinkStep.SUCCESS_MESSAGE, + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + val indexSettings = getIndexSettings(indexName) as Map>> + val writeBlock = indexSettings[indexName]!!["settings"]!![IndexMetadata.SETTING_BLOCKS_WRITE] as String? + assertNull("Write block setting was not reset after successful shrink", writeBlock) + } + } + } + // TODO This test is excessively flaky, disabling for now but it needs to be fixed private fun `test retries from first step`() { val testPolicy = """