Skip to content

Commit

Permalink
Support switch aliases in shrink action.
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Kravchuk <[email protected]>
  • Loading branch information
ikibo authored and ikibo committed Oct 13, 2023
1 parent d42cc69 commit 61493f2
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ShrinkAction(
val percentageOfSourceShards: Double?,
val targetIndexTemplate: Script?,
val aliases: List<Alias>?,
val switchAliases: Boolean = false,
val forceUnsafe: Boolean?,
index: Int
) : Action(name, index) {
Expand Down Expand Up @@ -104,6 +105,7 @@ class ShrinkAction(
if (percentageOfSourceShards != null) builder.field(PERCENTAGE_OF_SOURCE_SHARDS_FIELD, percentageOfSourceShards)
if (targetIndexTemplate != null) builder.field(TARGET_INDEX_TEMPLATE_FIELD, targetIndexTemplate)
if (aliases != null) { builder.aliasesField(aliases) }
builder.field(SWITCH_ALIASES, switchAliases)
if (forceUnsafe != null) builder.field(FORCE_UNSAFE_FIELD, forceUnsafe)
builder.endObject()
}
Expand All @@ -120,6 +122,7 @@ class ShrinkAction(
} else {
out.writeBoolean(false)
}
out.writeBoolean(switchAliases)
out.writeOptionalBoolean(forceUnsafe)
out.writeInt(actionIndex)
}
Expand All @@ -131,6 +134,7 @@ class ShrinkAction(
const val MAX_SHARD_SIZE_FIELD = "max_shard_size"
const val TARGET_INDEX_TEMPLATE_FIELD = "target_index_name_template"
const val ALIASES_FIELD = "aliases"
const val SWITCH_ALIASES = "switch_aliases"
const val FORCE_UNSAFE_FIELD = "force_unsafe"
const val LOCK_SOURCE_JOB_ID = "shrink-node_name"
fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.C
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.MAX_SHARD_SIZE_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.NUM_NEW_SHARDS_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.PERCENTAGE_OF_SOURCE_SHARDS_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.SWITCH_ALIASES
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.TARGET_INDEX_TEMPLATE_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
Expand All @@ -27,10 +28,11 @@ class ShrinkActionParser : ActionParser() {
val percentageOfSourceShards = sin.readOptionalDouble()
val targetIndexTemplate = if (sin.readBoolean()) Script(sin) else null
val aliases = if (sin.readBoolean()) sin.readList(::Alias) else null
val switchAliases = sin.readBoolean()
val forceUnsafe = sin.readOptionalBoolean()
val index = sin.readInt()

return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, index)
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, index)
}

@Suppress("NestedBlockDepth")
Expand All @@ -40,6 +42,7 @@ class ShrinkActionParser : ActionParser() {
var percentageOfSourceShards: Double? = null
var targetIndexTemplate: Script? = null
var aliases: List<Alias>? = null
var switchAliases = false
var forceUnsafe: Boolean? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -63,12 +66,13 @@ class ShrinkActionParser : ActionParser() {
}
}
}
SWITCH_ALIASES -> switchAliases = xcp.booleanValue()
FORCE_UNSAFE_FIELD -> forceUnsafe = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ShrinkAction.")
}
}

return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, index)
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, index)
}

override fun getActionType(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@

package org.opensearch.indexmanagement.indexstatemanagement.step.shrink

import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime
import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import java.time.Duration
Expand All @@ -45,6 +48,9 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
if (!deleteShrinkLock(localShrinkActionProperties, context.lockService, logger)) {
logger.error("Failed to delete Shrink action lock on node [${localShrinkActionProperties.nodeName}]")
}

switchAliases(context, localShrinkActionProperties)

stepStatus = StepStatus.COMPLETED
info = mapOf("message" to SUCCESS_MESSAGE)
return this
Expand Down Expand Up @@ -91,6 +97,62 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
}
}

private suspend fun switchAliases(context: StepContext, shrinkActionProperties: ShrinkActionProperties): Boolean {

val sourceIndexName = context.metadata.index
val targetIndexName = shrinkActionProperties.targetIndexName

if (!action.switchAliases) {
logger.info("Switch aliases disabled from [$sourceIndexName] to [$targetIndexName].")
return true
}

logger.info("Switching aliases from [$sourceIndexName] to [$targetIndexName].")

val targetIndexAliasesNames = context
.clusterService
.state()
.metadata()
.index(targetIndexName)
.aliases
.keys
val sourceIndexAliases = context
.clusterService
.state()
.metadata()
.index(sourceIndexName)
.aliases
.values

val aliasesToRemove = sourceIndexAliases.map { it.alias }.toTypedArray()
val req = IndicesAliasesRequest()
req.addAliasAction(AliasActions(AliasActions.Type.REMOVE).index(sourceIndexName).aliases(*aliasesToRemove))

sourceIndexAliases
.filterNot { targetIndexAliasesNames.contains(it.alias) }
.map {
AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
.index(targetIndexName)
.alias(it.alias)
.filter(it.filter?.string())
.indexRouting(it.indexRouting)
.searchRouting(it.searchRouting)
.isHidden(it.isHidden)
.writeIndex(it.writeIndex())
}
.forEach { req.addAliasAction(it) }

val aliasesAcked = context.client.admin().indices().suspendUntil { aliases(req, it) }.isAcknowledged

if (aliasesAcked) {
logger.info("Aliases switched successfully from [$sourceIndexName] to [$targetIndexName].")
} else {
logger.warn("Switching aliases from [$sourceIndexName] to [$targetIndexName] failed.")
}

return aliasesAcked
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
actionMetaData = currentMetadata.actionMetaData?.copy(
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@
"type": "object",
"enabled": false
},
"switch_aliases": {
"type": "boolean"
},
"force_unsafe": {
"type": "boolean"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,17 @@ fun randomShrinkAction(
percentageOfSourceShards: Double? = null,
targetIndexTemplate: Script? = if (randomBoolean()) randomTemplateScript(randomAlphaOfLength(10)) else null,
aliases: List<Alias>? = if (randomBoolean()) randomList(10) { randomAlias() } else null,
switchAliases: Boolean = randomBoolean(),
forceUnsafe: Boolean? = if (randomBoolean()) randomBoolean() else null
): ShrinkAction {
if (numNewShards == null && maxShardSize == null && percentageOfSourceShards == null) {
when (randomInt(2)) {
0 -> return ShrinkAction(abs(randomInt()) + 1, null, null, targetIndexTemplate, aliases, forceUnsafe, 0)
1 -> return ShrinkAction(null, randomByteSizeValue(), null, targetIndexTemplate, aliases, forceUnsafe, 0)
2 -> return ShrinkAction(null, null, randomDoubleBetween(0.0, 1.0, true), targetIndexTemplate, aliases, forceUnsafe, 0)
0 -> return ShrinkAction(abs(randomInt()) + 1, null, null, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
1 -> return ShrinkAction(null, randomByteSizeValue(), null, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
2 -> return ShrinkAction(null, null, randomDoubleBetween(0.0, 1.0, true), targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
}
}
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, 0)
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
}

fun randomReadOnlyActionConfig(): ReadOnlyAction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.apache.logging.log4j.LogManager
import org.junit.Assert
import org.junit.Assume
import org.junit.Before
import org.opensearch.action.admin.indices.alias.Alias
Expand All @@ -17,6 +18,8 @@ import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_R
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING
import org.opensearch.common.settings.Settings
import org.opensearch.core.common.unit.ByteSizeValue
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.MediaTypeRegistry
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
Expand All @@ -30,7 +33,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForSh
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.waitFor
import org.opensearch.core.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import java.time.Instant
Expand Down Expand Up @@ -237,6 +239,143 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
assertShrinkActionRun(indexName, policyID, excludedNode)
}

@Suppress("UNCHECKED_CAST")
fun `test switch aliases`() {
val indexName = "${testIndexName}_index_4"
val aliasToSwitch = Alias("${indexName}_alias_to_switch")
.writeIndex(false)
.isHidden(false)
.filter("""{"term":{"switch":"switch"}}""")
.routing("1")

val aliasToOverride = Alias("${indexName}_alias_to_override")
.writeIndex(true)
.isHidden(false)
.filter("""{"term":{"overridden":"overridden"}}""")
.routing("2")

val aliasToAdd = Alias("${indexName}_alias_to_add")
.writeIndex(false)
.isHidden(false)
.filter("""{"term":{"add":"add"}}""")
.routing("3")

val policyID = "${testIndexName}_testPolicyName_3"

val shrinkAction = ShrinkAction(
numNewShards = null,
maxShardSize = null,
percentageOfSourceShards = 0.5,
targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()),
aliases = listOf(aliasToOverride, aliasToAdd),
switchAliases = true,
forceUnsafe = true,
index = 0
)
val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf()))

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 11L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
createIndex(indexName, policyID, null, "0", "3", "")
changeAlias(
index = indexName, alias = aliasToSwitch.name(), action = "add", filter = aliasToSwitch.filter(), isWriteIndex = aliasToSwitch.writeIndex(), isHidden = aliasToSwitch.isHidden,
routing = aliasToSwitch.indexRouting().toInt(), indexRouting = aliasToSwitch.indexRouting().toInt(), searchRouting = aliasToSwitch.searchRouting().toInt()
)
changeAlias(
index = indexName, alias = aliasToOverride.name(), action = "add", filter = aliasToOverride.filter(), isWriteIndex = false, isHidden = aliasToOverride.isHidden,
routing = aliasToOverride.indexRouting().toInt(), indexRouting = aliasToOverride.indexRouting().toInt(), searchRouting = aliasToOverride.searchRouting().toInt()
)

insertSampleData(indexName, 3)

// Will change the startTime each execution so that it triggers in 2 seconds
// First execution: Policy is initialized
val managedIndexConfig = getExistingManagedIndexConfig(indexName)

updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }
// Starts AttemptMoveShardsStep
updateManagedIndexConfigStartTime(managedIndexConfig)

val targetIndexName = indexName + testIndexSuffix
waitFor(Instant.ofEpochSecond(60)) {
assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName)
assertEquals("true", getIndexBlocksWriteSetting(indexName))
assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName)
val settings = getFlatSettings(indexName)
val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
assertTrue(settings.containsKey("index.routing.allocation.require._name"))
assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"])
assertEquals(
AttemptMoveShardsStep.getSuccessMessage(nodeToShrink),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}

val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName

// starts WaitForMoveShardsStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) {
assertEquals(
WaitForMoveShardsStep.getSuccessMessage(nodeToShrink),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
// Wait for move should finish before this. Starts AttemptShrinkStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(50)) {
assertTrue("Target index is not created", indexExists(targetIndexName))
assertEquals(
AttemptShrinkStep.getSuccessMessage(targetIndexName),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}

// starts WaitForShrinkStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) {
val sourceIndexAliases = getAlias(indexName, "")
assertTrue("Source index aliases list must be empty after alias switch.", sourceIndexAliases.isEmpty())

val targetIndexAliases = getAlias(targetIndexName, "")
assertEquals("Target index aliases count is incorrect.", 3, targetIndexAliases.size)

assertTrue("Target index must contain shrink action alias.", targetIndexAliases.containsKey(aliasToAdd.name()))
assertAliasesEqual(aliasToAdd, targetIndexAliases[aliasToAdd.name()])

assertTrue("Target index must contain switched source index alias.", targetIndexAliases.containsKey(aliasToSwitch.name()))
assertAliasesEqual(aliasToSwitch, targetIndexAliases[aliasToSwitch.name()])

assertTrue("Target index must contain shrink action alias which overrides source index alias.", targetIndexAliases.containsKey(aliasToOverride.name()))
assertAliasesEqual(aliasToOverride, targetIndexAliases[aliasToOverride.name()])
}
}

@Suppress("UNCHECKED_CAST")
private fun assertAliasesEqual(expectedAlas: Alias, actualAliasRaw: Any?) {
Assert.assertNotNull("Actual alias to compare must not be null.", actualAliasRaw)
val actualAlias = actualAliasRaw as Map<String, Any?>
assertEquals(expectedAlas.writeIndex() ?: false, actualAlias["is_write_index"] ?: false)
assertEquals(expectedAlas.isHidden ?: false, actualAlias["is_hidden"] ?: false)
assertEquals(expectedAlas.searchRouting(), actualAlias["search_routing"])
assertEquals(expectedAlas.indexRouting(), actualAlias["index_routing"])

val builder = MediaTypeRegistry.contentBuilder(MediaTypeRegistry.JSON)
builder.map(actualAlias["filter"] as Map<String, Any>)
val actualFilter = builder.toString()
assertEquals(expectedAlas.filter(), actualFilter)
}

fun `test no-op with single source index primary shard`() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val indexName = "${testIndexName}_index_shard_noop"
Expand Down

0 comments on commit 61493f2

Please sign in to comment.