Skip to content

Commit

Permalink
Fixes shard allocation checks
Browse files Browse the repository at this point in the history
Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob committed Apr 18, 2022
1 parent ab1e821 commit 00ad17e
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon
import org.opensearch.indexmanagement.indexstatemanagement.util.getIntervalFromManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.util.getManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.getShardIdToNodeNameSet
import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockID
import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen
import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest
Expand Down Expand Up @@ -309,27 +310,28 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
nodesWithSpace.add(Tuple(remainingMem, node.node.name))
}
}
val shardIdToNodeList: Map<Int, Set<String>> = getShardIdToNodeNameSet(indicesStatsResponse, stepContext.clusterService.state().nodes)
val suitableNodes: ArrayList<String> = ArrayList()
// For each node, do a dry run of moving all shards to the node to make sure that there aren't any other blockers
// to the allocation.
for (sizeNodeTuple in nodesWithSpace) {
val targetNodeName = sizeNodeTuple.v2()
val indexName = stepContext.metadata.index
val clusterRerouteRequest = ClusterRerouteRequest().explain(true).dryRun(true)
var numberOfRerouteRequests = 0
val requestedShardIds: MutableSet<Int> = mutableSetOf()
for (shard in indicesStatsResponse.shards) {
val shardId = shard.shardRouting.shardId()
val currentShardNode = stepContext.clusterService.state().nodes[shard.shardRouting.currentNodeId()]
// Don't attempt a dry run for shards which are already on that node
if (currentShardNode.name == targetNodeName) continue
// Don't attempt a dry run for shards which have a copy already on that node
if (shardIdToNodeList[shardId.id]?.contains(targetNodeName) == true || requestedShardIds.contains(shardId.id)) continue
clusterRerouteRequest.add(MoveAllocationCommand(indexName, shardId.id, currentShardNode.name, targetNodeName))
numberOfRerouteRequests++
requestedShardIds.add(shardId.id)
}
val clusterRerouteResponse: ClusterRerouteResponse =
stepContext.client.admin().cluster().suspendUntil { reroute(clusterRerouteRequest, it) }
val numYesDecisions = clusterRerouteResponse.explanations.explanations().count { it.decisions().type().equals((Decision.Type.YES)) }
// Should be the same number of yes decisions as the number of primary shards
if (numYesDecisions == numberOfRerouteRequests) {
if (numYesDecisions == requestedShardIds.size) {
suitableNodes.add(sizeNodeTuple.v2())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,28 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
val nodeToMoveOnto = localShrinkActionProperties.nodeName
val inSyncAllocations = context.clusterService.state().metadata.indices[indexName].inSyncAllocationIds
val numReplicas = context.clusterService.state().metadata.indices[indexName].numberOfReplicas
var numShardsOnNode = 0
val shardIdOnNode: MutableMap<Int, Boolean> = mutableMapOf()
var numShardsInSync = 0
for (shard: ShardStats in response.shards) {
val routingInfo = shard.shardRouting
val nodeIdShardIsOn = routingInfo.currentNodeId()
val nodeNameShardIsOn = context.clusterService.state().nodes()[nodeIdShardIsOn].name
if (nodeNameShardIsOn.equals(nodeToMoveOnto) && routingInfo.started()) {
shardIdOnNode[shard.shardRouting.id] = true
}
if (routingInfo.primary()) {
if (nodeNameShardIsOn.equals(nodeToMoveOnto) && routingInfo.started()) {
numShardsOnNode++
}
// Either there must be no replicas (force unsafe must have been set) or all replicas must be in sync as
// it isn't known which shard (any replica or primary) will be moved to the target node and used in the shrink.
if (numReplicas == 0 || inSyncAllocations[routingInfo.id].size == (numReplicas + 1)) {
numShardsInSync++
}
}
}
if (numShardsOnNode >= numPrimaryShards && numShardsInSync >= numPrimaryShards) {
if (shardIdOnNode.values.all { it } && numShardsInSync >= numPrimaryShards) {
info = mapOf("message" to getSuccessMessage(nodeToMoveOnto))
stepStatus = StepStatus.COMPLETED
} else {
val numShardsNotOnNode = numPrimaryShards - numShardsOnNode
val numShardsNotOnNode = shardIdOnNode.values.count { !it }
val numShardsNotInSync = numPrimaryShards - numShardsInSync
checkTimeOut(context, numShardsNotOnNode, numShardsNotInSync, nodeToMoveOnto)
}
Expand Down Expand Up @@ -166,8 +166,8 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
companion object {
const val name = "wait_for_move_shards_step"
fun getSuccessMessage(node: String) = "The shards successfully moved to $node."
fun getTimeoutFailure(node: String) = "Shrink failed because it took to long to move shards to $node"
fun getTimeoutDelay(node: String) = "Shrink delayed because it took to long to move shards to $node"
fun getTimeoutFailure(node: String) = "Shrink failed because it took too long to move shards to $node"
fun getTimeoutDelay(node: String) = "Shrink delayed because it took too long to move shards to $node"
const val FAILURE_MESSAGE = "Shrink failed when waiting for shards to move."
const val METADATA_FAILURE_MESSAGE = "Shrink action properties are null, metadata was not properly populated"
const val MOVE_SHARDS_TIMEOUT_IN_SECONDS = 43200L // 12hrs in seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.cluster.node.stats.NodeStats
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -192,3 +194,20 @@ suspend fun resetReadOnlyAndRouting(index: String, client: Client, originalSetti
fun getShrinkLockID(nodeName: String): String {
return "$LOCK_RESOURCE_TYPE-$LOCK_RESOURCE_NAME-$nodeName"
}

// Creates a map of shardIds to the set of node names which the shard copies reside on. For example, with 2 replicas
// each shardId would have a set containing 3 node names, for the nodes of the primary and two replicas.
fun getShardIdToNodeNameSet(indicesStatsResponse: IndicesStatsResponse, nodes: DiscoveryNodes): Map<Int, Set<String>> {
val shardIdToNodeList: MutableMap<Int, MutableSet<String>> = mutableMapOf()
for (shard in indicesStatsResponse.shards) {
val shardId = shard.shardRouting.shardId().id
// If nodeName is null, then the nodes could have changed since the indicesStatsResponse, just skip adding it
val nodeName: String = nodes[shard.shardRouting.currentNodeId()].name ?: continue
if (shardIdToNodeList.containsKey(shardId)) {
shardIdToNodeList[shardId]?.add(nodeName)
} else {
shardIdToNodeList[shardId] = mutableSetOf(nodeName)
}
}
return shardIdToNodeList
}
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,101 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
}
}

@Suppress("UNCHECKED_CAST")
fun `test shrink with replicas`() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val nodes = getNodes()
if (nodes.size > 1) {
val indexName = "${testIndexName}_with_replicas"
val policyID = "${testIndexName}_with_replicas"
val shrinkAction = ShrinkAction(
numNewShards = null,
maxShardSize = null,
percentageOfSourceShards = 0.5,
targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()),
aliases = null,
forceUnsafe = false,
index = 0
)
val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf()))

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 11L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID, null, "1", "3", "")
insertSampleData(indexName, 3)
// Will change the startTime each execution so that it triggers in 2 seconds
// First execution: Policy is initialized
val managedIndexConfig = getExistingManagedIndexConfig(indexName)
logger.info("index settings: \n ${getFlatSettings(indexName)}")

updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }
// Starts AttemptMoveShardsStep
updateManagedIndexConfigStartTime(managedIndexConfig)
val targetIndexName = indexName + testIndexSuffix
waitFor(Instant.ofEpochSecond(60)) {
assertEquals(
targetIndexName,
getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName
)
assertEquals("true", getIndexBlocksWriteSetting(indexName))
val nodeName =
getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
assertNotNull("Couldn't find node to shrink onto.", nodeName)
val settings = getFlatSettings(indexName)
assertTrue(settings.containsKey("index.routing.allocation.require._name"))
assertEquals(nodeName, settings["index.routing.allocation.require._name"])
assertEquals(
AttemptMoveShardsStep.getSuccessMessage(nodeName),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}

val nodeToShrink =
getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName

// starts WaitForMoveShardsStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) {
assertEquals(
WaitForMoveShardsStep.getSuccessMessage(nodeToShrink),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
// Wait for move should finish before this. Starts AttemptShrinkStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(50)) {
assertTrue("Target index is not created", indexExists(targetIndexName))
assertEquals(
AttemptShrinkStep.getSuccessMessage(targetIndexName),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}

// starts WaitForShrinkStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) {
// one primary and one replica
assertTrue(getIndexShards(targetIndexName).size == 2)
assertEquals(
WaitForShrinkStep.SUCCESS_MESSAGE,
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
val writeBlock = indexSettings[indexName]!!["settings"]!![IndexMetadata.SETTING_BLOCKS_WRITE] as String?
assertNull("Write block setting was not reset after successful shrink", writeBlock)
}
}
}

// TODO This test is excessively flaky, disabling for now but it needs to be fixed
private fun `test retries from first step`() {
val testPolicy = """
Expand Down

0 comments on commit 00ad17e

Please sign in to comment.