Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…opensearch-project#589)

* Adds an alias action (opensearch-project#35)

Signed-off-by: Megha Goyal <[email protected]>

Co-authored-by: Megha Goyal <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and goyamegh authored Nov 3, 2022
1 parent 2ee5bec commit 2c8160a
Show file tree
Hide file tree
Showing 11 changed files with 611 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
Expand All @@ -34,6 +35,7 @@ class ISMActionsParser private constructor() {
}

val parsers = mutableListOf(
AliasActionParser(),
AllocationActionParser(),
CloseActionParser(),
DeleteActionParser(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.alias.AttemptAliasActionsStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

class AliasAction(
val actions: List<IndicesAliasesRequest.AliasActions>,
index: Int
) : Action(name, index) {

/**
* Allowing the alias action to be only applicable on the managed index for ADD and REMOVE actions only.
* https://github.com/opensearch-project/OpenSearch/blob/4d045a164e12a382881140e32f9285a3224fecc7/server/src/main/java/org/opensearch/action/admin/indices/alias/IndicesAliasesRequest.java#L105
*/
init {
require(actions.isNotEmpty()) { "At least one alias action needs to be specified." }
val allowedActionTypes = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
require(actions.all { it.actionType() in allowedActionTypes }) { "Only ADD and REMOVE actions are allowed." }
require(
actions.all { it.indices().isNullOrEmpty() }
) { "Alias action can only work on its applied index so don't accept index/indices parameter." }
require(
actions.all { it.aliases().isNotEmpty() }
) { "At least one alias needs to be specified." }
}

private val attemptAliasActionsStep = AttemptAliasActionsStep(this)

private val steps = listOf(attemptAliasActionsStep)

override fun getStepToExecute(context: StepContext): Step {
return attemptAliasActionsStep
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(ACTIONS, actions)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeList(actions)
out.writeInt(actionIndex)
}

companion object {
const val name = "alias"
const val ACTIONS = "actions"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction.Companion.ACTIONS
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class AliasActionParser : ActionParser() {

private val logger = LogManager.getLogger(javaClass)
override fun fromStreamInput(sin: StreamInput): Action {
val actions = sin.readList(IndicesAliasesRequest::AliasActions)
val index = sin.readInt()
return AliasAction(actions, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
val actions: MutableList<IndicesAliasesRequest.AliasActions> = mutableListOf()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ACTIONS -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
actions.add(IndicesAliasesRequest.AliasActions.fromXContent(xcp))
}
}
else -> {
logger.error("Invalid field: [$fieldName] found in AliasAction.")
throw IllegalArgumentException("Invalid field: [$fieldName] found in AliasAction.")
}
}
}
return AliasAction(actions, index)
}

override fun getActionType(): String = AliasAction.name
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.alias

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData

class AttemptAliasActionsStep(private val action: AliasAction) : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
val request = IndicesAliasesRequest()
action.actions.forEach {
// Applying the actions on the managed index.
it.indices(indexName)
request.addAliasAction(it)
}
val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { aliases(request, it) }
handleResponse(response, indexName, action.actions)
} catch (e: Exception) {
handleException(e, indexName, action.actions)
}
return this
}

private fun handleException(e: Exception, indexName: String, actions: List<IndicesAliasesRequest.AliasActions>) {
val message = getFailedMessage(indexName, actions)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

private fun handleResponse(
response: AcknowledgedResponse,
indexName: String,
actions: List<IndicesAliasesRequest.AliasActions>
) {
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to getFailedMessage(indexName, actions))
}
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

override fun isIdempotent() = true

companion object {
val validTopContextFields = setOf("index")
const val name = "attempt_alias"
fun getFailedMessage(
index: String,
actions: List<IndicesAliasesRequest.AliasActions>
) = "Failed to update alias [index=$index] for actions: [actions=$actions]"

fun getSuccessMessage(index: String) = "Successfully updated alias [index=$index]"
}
}
10 changes: 9 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 16
"schema_version": 17
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -159,6 +159,14 @@
}
}
},
"alias": {
"properties": {
"actions": {
"type": "object",
"enabled": false
}
}
},
"delete": {
"type": "object"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

val configSchemaVersion = 16
val configSchemaVersion = 17
val historySchemaVersion = 5

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
package org.opensearch.indexmanagement.indexstatemanagement

import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.index.RandomCreateIndexGenerator.randomAlias
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationAction
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseAction
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
Expand Down Expand Up @@ -206,6 +208,24 @@ fun randomOpenActionConfig(): OpenAction {
return OpenAction(index = 0)
}

fun randomAliasAction(includeIndices: Boolean = false): AliasAction {
val actions = List(OpenSearchRestTestCase.randomIntBetween(1, 10)) { if (includeIndices) randomAliasActionWithIndices() else randomAliasActions() }
return AliasAction(actions = actions, index = 0)
}

fun randomAliasActions(): IndicesAliasesRequest.AliasActions {
val types = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
return IndicesAliasesRequest.AliasActions(OpenSearchRestTestCase.randomSubsetOf(1, types).first())
.alias(OpenSearchRestTestCase.randomAlphaOfLength(10))
}

fun randomAliasActionWithIndices(): IndicesAliasesRequest.AliasActions {
val types = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
return IndicesAliasesRequest.AliasActions(OpenSearchRestTestCase.randomSubsetOf(1, types).first())
.alias(OpenSearchRestTestCase.randomAlphaOfLength(10))
.indices(OpenSearchRestTestCase.randomAlphaOfLength(10))
}

fun randomDestination(type: DestinationType = randomDestinationType()): Destination {
return Destination(
type = type,
Expand Down Expand Up @@ -478,6 +498,11 @@ fun OpenAction.toJsonString(): String {
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun AliasAction.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun ISMTemplate.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Loading

0 comments on commit 2c8160a

Please sign in to comment.