Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

actionify change policy API #303

Merged
merged 4 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ out/
*.iws
.DS_Store
*.log
http
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.TransportAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.ChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.TransportChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.RemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction
Expand Down Expand Up @@ -134,7 +136,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
RestRetryFailedManagedIndexAction(),
RestAddPolicyAction(),
RestRemovePolicyAction(),
RestChangePolicyAction(clusterService)
RestChangePolicyAction()
)
}

Expand Down Expand Up @@ -201,7 +203,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java),
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java),
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java),
ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java)
ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java),
ActionPlugin.ActionHandler(ChangePolicyAction.INSTANCE, TransportChangePolicyAction::class.java)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
Expand All @@ -39,7 +42,15 @@ data class ChangePolicy(
val state: String?,
val include: List<StateFilter>,
val isSafe: Boolean
) : ToXContentObject {
) : Writeable, ToXContentObject {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
policyID = sin.readString(),
state = sin.readOptionalString(),
include = sin.readList(::StateFilter),
isSafe = sin.readBoolean()
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder
Expand All @@ -51,6 +62,14 @@ data class ChangePolicy(
return builder
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(policyID)
out.writeOptionalString(state)
out.writeList(include)
out.writeBoolean(isSafe)
}

companion object {
const val POLICY_ID_FIELD = "policy_id"
const val STATE_FIELD = "state"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model

import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

data class StateFilter(val state: String) {
data class StateFilter(val state: String) : Writeable {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
state = sin.readString()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(state)
}

companion object {
const val STATE_FIELD = "state"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,24 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ChangePolicy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILED_INDICES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILURES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FailedIndex
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.UPDATED_INDICES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.isSafeToChange
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.updateManagedIndexRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.ChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.ChangePolicyRequest
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.get.MultiGetRequest
import org.elasticsearch.action.get.MultiGetResponse
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Strings
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.BytesRestResponse
import org.elasticsearch.rest.RestChannel
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.RestResponse
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestResponseListener
import org.elasticsearch.search.fetch.subphase.FetchSourceContext
import org.elasticsearch.rest.action.RestToXContentListener
import java.io.IOException

class RestChangePolicyAction(val clusterService: ClusterService) : BaseRestHandler() {
class RestChangePolicyAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

Expand All @@ -89,202 +56,13 @@ class RestChangePolicyAction(val clusterService: ClusterService) : BaseRestHandl
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
val changePolicy = ChangePolicy.parse(xcp)

return RestChannelConsumer { channel ->
ChangePolicyHandler(client, channel, indices, changePolicy).start()
}
}

inner class ChangePolicyHandler(
client: NodeClient,
channel: RestChannel,
private val indices: Array<String>,
private val changePolicy: ChangePolicy
) : AsyncActionHandler(client, channel) {

private val failedIndices = mutableListOf<FailedIndex>()
private val managedIndexUuids = mutableListOf<Pair<String, String>>()
private val indexUuidToCurrentState = mutableMapOf<String, String>()
lateinit var policy: Policy
lateinit var response: GetResponse

fun start() {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, changePolicy.policyID)

client.get(getRequest, ActionListener.wrap(::onGetPolicyResponse, ::onFailure))
}

private fun onGetPolicyResponse(response: GetResponse) {
if (!response.isExists || response.isSourceEmpty) {
return channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, "Could not find policy=${changePolicy.policyID}"))
}
this.response = response
IndexUtils.checkAndUpdateConfigIndexMapping(
clusterService.state(),
client.admin().indices(),
ActionListener.wrap(::onUpdateMapping, ::onFailure)
)
}

private fun onUpdateMapping(acknowledgedResponse: AcknowledgedResponse) {
if (!acknowledgedResponse.isAcknowledged) {
return channel.sendResponse(BytesRestResponse(RestStatus.FAILED_DEPENDENCY,
"Could not update $INDEX_MANAGEMENT_INDEX with new mapping."))
}
policy = XContentHelper.createParser(
channel.request().xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef,
XContentType.JSON
).use { Policy.parseWithType(it, response.id, response.seqNo, response.primaryTerm) }

getClusterState()
}

@Suppress("SpreadOperator")
private fun getClusterState() {
val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*indices)
.metadata(true)
.local(false)
.indicesOptions(IndicesOptions.strictExpand())

client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(::processResponse, ::onFailure))
}

@Suppress("ComplexMethod")
private fun processResponse(response: ClusterStateResponse) {
val includedStates = changePolicy.include.map { it.state }.toSet()
response.state.metadata.indices.forEach {
val indexMetaData = it.value
val currentState = indexMetaData.getManagedIndexMetaData()?.stateMetaData?.name
if (currentState != null) {
indexUuidToCurrentState[indexMetaData.indexUUID] = currentState
}
when {
// If there is no policyID on the index then it's not currently being managed
indexMetaData.getPolicyID() == null ->
failedIndices.add(FailedIndex(indexMetaData.index.name, indexMetaData.index.uuid, INDEX_NOT_MANAGED))
// else if there exists a transitionTo on the ManagedIndexMetaData then we will
// fail as they might not of meant to add a ChangePolicy when its on the next state
indexMetaData.getManagedIndexMetaData()?.transitionTo != null ->
failedIndices.add(FailedIndex(indexMetaData.index.name, indexMetaData.index.uuid, INDEX_IN_TRANSITION))
// else if there is no ManagedIndexMetaData yet then the managed index has not initialized and we can change the policy safely
indexMetaData.getManagedIndexMetaData() == null ->
managedIndexUuids.add(indexMetaData.index.name to indexMetaData.index.uuid)
// else if the includedStates is empty (i.e. not being used) then we will always try to update the managed index
includedStates.isEmpty() -> managedIndexUuids.add(indexMetaData.index.name to indexMetaData.index.uuid)
// else only update the managed index if its currently in one of the included states
includedStates.contains(indexMetaData.getManagedIndexMetaData()?.stateMetaData?.name) ->
managedIndexUuids.add(indexMetaData.index.name to indexMetaData.index.uuid)
// else the managed index did not match any of the included state filters and we will not update it
else -> log.debug("Skipping ${indexMetaData.index.name} as it does not match any of the include state filters")
}
}
val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy)

if (managedIndexUuids.isEmpty()) {
channel.sendResponse(getRestResponse(0, failedIndices))
} else {
client.multiGet(
getManagedIndexConfigMultiGetRequest(managedIndexUuids.map { (_, indexUuid) -> indexUuid }.toTypedArray()),
ActionListener.wrap(::onMultiGetResponse, ::onFailure)
)
}
}

private fun onMultiGetResponse(response: MultiGetResponse) {
val foundManagedIndices = mutableSetOf<String>()
val sweptConfigs = response.responses.mapNotNull {
// The id is the index uuid
if (!it.isFailed && it.response != null) {
foundManagedIndices.add(it.response.id)
SweptManagedIndexConfig.parseWithType(contentParser(it.response.sourceAsBytesRef), it.response.seqNo, it.response.primaryTerm)
} else {
null
}
}

// If we do not find a matching ManagedIndexConfig for one of the provided managedIndexUuids
// it means that we have not yet created a job for that index (which can happen during the small
// gap of adding a policy_id to an index and a job being created by the coordinator, or the coordinator
// failing to create a job and waiting for the sweep to create the job). We will add these as failed indices
// that can not be updated from the ChangePolicy yet.
managedIndexUuids.forEach {
val (index, indexUuid) = it
if (!foundManagedIndices.contains(indexUuid)) {
failedIndices.add(FailedIndex(index, indexUuid, INDEX_NOT_INITIALIZED))
}
}

if (sweptConfigs.isEmpty()) {
channel.sendResponse(getRestResponse(0, failedIndices))
} else {
updateManagedIndexConfig(sweptConfigs)
}
}

private fun updateManagedIndexConfig(sweptConfigs: List<SweptManagedIndexConfig>) {
val mapOfItemIdToIndex = mutableMapOf<Int, Pair<String, String>>()
val bulkRequest = BulkRequest()
sweptConfigs.forEachIndexed { index, sweptConfig ->
// compare the sweptconfig policy to the get policy here and update changePolicy
val currentStateName = indexUuidToCurrentState[sweptConfig.uuid]
val updatedChangePolicy = changePolicy
.copy(isSafe = sweptConfig.policy?.isSafeToChange(currentStateName, policy, changePolicy) == true)
bulkRequest.add(updateManagedIndexRequest(sweptConfig.copy(changePolicy = updatedChangePolicy)))
mapOfItemIdToIndex[index] = sweptConfig.index to sweptConfig.uuid
}
client.bulk(bulkRequest, onBulkResponse(mapOfItemIdToIndex))
}

private fun onBulkResponse(mapOfItemIdToIndex: Map<Int, Pair<String, String>>): RestResponseListener<BulkResponse> {
return object : RestResponseListener<BulkResponse>(channel) {
override fun buildResponse(bulkResponse: BulkResponse): RestResponse {
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
failedResponses.forEach {
val indexPair = mapOfItemIdToIndex[it.itemId]
if (indexPair != null) {
failedIndices.add(FailedIndex(indexPair.first, indexPair.second, it.failureMessage))
}
}

return getRestResponse((bulkResponse.items ?: arrayOf()).size - failedResponses.size, failedIndices)
}
}
}

private fun getRestResponse(updated: Int, failedIndices: List<FailedIndex>): BytesRestResponse {
val builder = channel.newBuilder()
.startObject()
.field(UPDATED_INDICES, updated)
.field(FAILURES, failedIndices.isNotEmpty())
.field(FAILED_INDICES, failedIndices)
.endObject()
return BytesRestResponse(RestStatus.OK, builder)
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
return XContentHelper.createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON)
return RestChannelConsumer { channel ->
client.execute(ChangePolicyAction.INSTANCE, changePolicyRequest, RestToXContentListener(channel))
}
}

@Suppress("SpreadOperator")
private fun getManagedIndexConfigMultiGetRequest(managedIndexUuids: Array<String>): MultiGetRequest {
val request = MultiGetRequest()
val includes = arrayOf(
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.INDEX_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.INDEX_UUID_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.POLICY_ID_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.POLICY_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.CHANGE_POLICY_FIELD}"
)
val excludes = emptyArray<String>()
val fetchSourceContext = FetchSourceContext(true, includes, excludes)
managedIndexUuids.forEach { request.add(MultiGetRequest.Item(INDEX_MANAGEMENT_INDEX, it).fetchSourceContext(fetchSourceContext)) }
return request
}

companion object {
const val CHANGE_POLICY_BASE_URI = "$ISM_BASE_URI/change_policy"
const val INDEX_NOT_MANAGED = "This index is not being managed"
Expand Down
Loading