Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
actionify change policy API (#303)
Browse files Browse the repository at this point in the history
* actionify change policy API

* refactor listener
  • Loading branch information
bowenlan-amzn authored Oct 9, 2020
1 parent 5931cdd commit 421c3ef
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 236 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ out/
*.iws
.DS_Store
*.log
http
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.TransportAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.ChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.TransportChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.RemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction
Expand Down Expand Up @@ -134,7 +136,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
RestRetryFailedManagedIndexAction(),
RestAddPolicyAction(),
RestRemovePolicyAction(),
RestChangePolicyAction(clusterService)
RestChangePolicyAction()
)
}

Expand Down Expand Up @@ -201,7 +203,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java),
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java),
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java),
ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java)
ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java),
ActionPlugin.ActionHandler(ChangePolicyAction.INSTANCE, TransportChangePolicyAction::class.java)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
Expand All @@ -39,7 +42,15 @@ data class ChangePolicy(
val state: String?,
val include: List<StateFilter>,
val isSafe: Boolean
) : ToXContentObject {
) : Writeable, ToXContentObject {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
policyID = sin.readString(),
state = sin.readOptionalString(),
include = sin.readList(::StateFilter),
isSafe = sin.readBoolean()
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder
Expand All @@ -51,6 +62,14 @@ data class ChangePolicy(
return builder
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(policyID)
out.writeOptionalString(state)
out.writeList(include)
out.writeBoolean(isSafe)
}

companion object {
const val POLICY_ID_FIELD = "policy_id"
const val STATE_FIELD = "state"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model

import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

data class StateFilter(val state: String) {
data class StateFilter(val state: String) : Writeable {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
state = sin.readString()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(state)
}

companion object {
const val STATE_FIELD = "state"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,24 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ChangePolicy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILED_INDICES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILURES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FailedIndex
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.UPDATED_INDICES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.isSafeToChange
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.updateManagedIndexRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.ChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.ChangePolicyRequest
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.get.MultiGetRequest
import org.elasticsearch.action.get.MultiGetResponse
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Strings
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.BytesRestResponse
import org.elasticsearch.rest.RestChannel
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.RestResponse
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestResponseListener
import org.elasticsearch.search.fetch.subphase.FetchSourceContext
import org.elasticsearch.rest.action.RestToXContentListener
import java.io.IOException

class RestChangePolicyAction(val clusterService: ClusterService) : BaseRestHandler() {
class RestChangePolicyAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

Expand All @@ -89,202 +56,13 @@ class RestChangePolicyAction(val clusterService: ClusterService) : BaseRestHandl
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
val changePolicy = ChangePolicy.parse(xcp)

return RestChannelConsumer { channel ->
ChangePolicyHandler(client, channel, indices, changePolicy).start()
}
}

inner class ChangePolicyHandler(
client: NodeClient,
channel: RestChannel,
private val indices: Array<String>,
private val changePolicy: ChangePolicy
) : AsyncActionHandler(client, channel) {

private val failedIndices = mutableListOf<FailedIndex>()
private val managedIndexUuids = mutableListOf<Pair<String, String>>()
private val indexUuidToCurrentState = mutableMapOf<String, String>()
lateinit var policy: Policy
lateinit var response: GetResponse

fun start() {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, changePolicy.policyID)

client.get(getRequest, ActionListener.wrap(::onGetPolicyResponse, ::onFailure))
}

private fun onGetPolicyResponse(response: GetResponse) {
if (!response.isExists || response.isSourceEmpty) {
return channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, "Could not find policy=${changePolicy.policyID}"))
}
this.response = response
IndexUtils.checkAndUpdateConfigIndexMapping(
clusterService.state(),
client.admin().indices(),
ActionListener.wrap(::onUpdateMapping, ::onFailure)
)
}

private fun onUpdateMapping(acknowledgedResponse: AcknowledgedResponse) {
if (!acknowledgedResponse.isAcknowledged) {
return channel.sendResponse(BytesRestResponse(RestStatus.FAILED_DEPENDENCY,
"Could not update $INDEX_MANAGEMENT_INDEX with new mapping."))
}
policy = XContentHelper.createParser(
channel.request().xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef,
XContentType.JSON
).use { Policy.parseWithType(it, response.id, response.seqNo, response.primaryTerm) }

getClusterState()
}

@Suppress("SpreadOperator")
private fun getClusterState() {
val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*indices)
.metadata(true)
.local(false)
.indicesOptions(IndicesOptions.strictExpand())

client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(::processResponse, ::onFailure))
}

@Suppress("ComplexMethod")
private fun processResponse(response: ClusterStateResponse) {
val includedStates = changePolicy.include.map { it.state }.toSet()
response.state.metadata.indices.forEach {
val indexMetaData = it.value
val currentState = indexMetaData.getManagedIndexMetaData()?.stateMetaData?.name
if (currentState != null) {
indexUuidToCurrentState[indexMetaData.indexUUID] = currentState
}
when {
// If there is no policyID on the index then it's not currently being managed
indexMetaData.getPolicyID() == null ->
failedIndices.add(FailedIndex(indexMetaData.index.name, indexMetaData.index.uuid, INDEX_NOT_MANAGED))
// else if there exists a transitionTo on the ManagedIndexMetaData then we will
// fail as they might not of meant to add a ChangePolicy when its on the next state
indexMetaData.getManagedIndexMetaData()?.transitionTo != null ->
failedIndices.add(FailedIndex(indexMetaData.index.name, indexMetaData.index.uuid, INDEX_IN_TRANSITION))
// else if there is no ManagedIndexMetaData yet then the managed index has not initialized and we can change the policy safely
indexMetaData.getManagedIndexMetaData() == null ->
managedIndexUuids.add(indexMetaData.index.name to indexMetaData.index.uuid)
// else if the includedStates is empty (i.e. not being used) then we will always try to update the managed index
includedStates.isEmpty() -> managedIndexUuids.add(indexMetaData.index.name to indexMetaData.index.uuid)
// else only update the managed index if its currently in one of the included states
includedStates.contains(indexMetaData.getManagedIndexMetaData()?.stateMetaData?.name) ->
managedIndexUuids.add(indexMetaData.index.name to indexMetaData.index.uuid)
// else the managed index did not match any of the included state filters and we will not update it
else -> log.debug("Skipping ${indexMetaData.index.name} as it does not match any of the include state filters")
}
}
val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy)

if (managedIndexUuids.isEmpty()) {
channel.sendResponse(getRestResponse(0, failedIndices))
} else {
client.multiGet(
getManagedIndexConfigMultiGetRequest(managedIndexUuids.map { (_, indexUuid) -> indexUuid }.toTypedArray()),
ActionListener.wrap(::onMultiGetResponse, ::onFailure)
)
}
}

private fun onMultiGetResponse(response: MultiGetResponse) {
val foundManagedIndices = mutableSetOf<String>()
val sweptConfigs = response.responses.mapNotNull {
// The id is the index uuid
if (!it.isFailed && it.response != null) {
foundManagedIndices.add(it.response.id)
SweptManagedIndexConfig.parseWithType(contentParser(it.response.sourceAsBytesRef), it.response.seqNo, it.response.primaryTerm)
} else {
null
}
}

// If we do not find a matching ManagedIndexConfig for one of the provided managedIndexUuids
// it means that we have not yet created a job for that index (which can happen during the small
// gap of adding a policy_id to an index and a job being created by the coordinator, or the coordinator
// failing to create a job and waiting for the sweep to create the job). We will add these as failed indices
// that can not be updated from the ChangePolicy yet.
managedIndexUuids.forEach {
val (index, indexUuid) = it
if (!foundManagedIndices.contains(indexUuid)) {
failedIndices.add(FailedIndex(index, indexUuid, INDEX_NOT_INITIALIZED))
}
}

if (sweptConfigs.isEmpty()) {
channel.sendResponse(getRestResponse(0, failedIndices))
} else {
updateManagedIndexConfig(sweptConfigs)
}
}

private fun updateManagedIndexConfig(sweptConfigs: List<SweptManagedIndexConfig>) {
val mapOfItemIdToIndex = mutableMapOf<Int, Pair<String, String>>()
val bulkRequest = BulkRequest()
sweptConfigs.forEachIndexed { index, sweptConfig ->
// compare the sweptconfig policy to the get policy here and update changePolicy
val currentStateName = indexUuidToCurrentState[sweptConfig.uuid]
val updatedChangePolicy = changePolicy
.copy(isSafe = sweptConfig.policy?.isSafeToChange(currentStateName, policy, changePolicy) == true)
bulkRequest.add(updateManagedIndexRequest(sweptConfig.copy(changePolicy = updatedChangePolicy)))
mapOfItemIdToIndex[index] = sweptConfig.index to sweptConfig.uuid
}
client.bulk(bulkRequest, onBulkResponse(mapOfItemIdToIndex))
}

private fun onBulkResponse(mapOfItemIdToIndex: Map<Int, Pair<String, String>>): RestResponseListener<BulkResponse> {
return object : RestResponseListener<BulkResponse>(channel) {
override fun buildResponse(bulkResponse: BulkResponse): RestResponse {
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
failedResponses.forEach {
val indexPair = mapOfItemIdToIndex[it.itemId]
if (indexPair != null) {
failedIndices.add(FailedIndex(indexPair.first, indexPair.second, it.failureMessage))
}
}

return getRestResponse((bulkResponse.items ?: arrayOf()).size - failedResponses.size, failedIndices)
}
}
}

private fun getRestResponse(updated: Int, failedIndices: List<FailedIndex>): BytesRestResponse {
val builder = channel.newBuilder()
.startObject()
.field(UPDATED_INDICES, updated)
.field(FAILURES, failedIndices.isNotEmpty())
.field(FAILED_INDICES, failedIndices)
.endObject()
return BytesRestResponse(RestStatus.OK, builder)
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
return XContentHelper.createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON)
return RestChannelConsumer { channel ->
client.execute(ChangePolicyAction.INSTANCE, changePolicyRequest, RestToXContentListener(channel))
}
}

@Suppress("SpreadOperator")
private fun getManagedIndexConfigMultiGetRequest(managedIndexUuids: Array<String>): MultiGetRequest {
val request = MultiGetRequest()
val includes = arrayOf(
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.INDEX_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.INDEX_UUID_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.POLICY_ID_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.POLICY_FIELD}",
"${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.CHANGE_POLICY_FIELD}"
)
val excludes = emptyArray<String>()
val fetchSourceContext = FetchSourceContext(true, includes, excludes)
managedIndexUuids.forEach { request.add(MultiGetRequest.Item(INDEX_MANAGEMENT_INDEX, it).fetchSourceContext(fetchSourceContext)) }
return request
}

companion object {
const val CHANGE_POLICY_BASE_URI = "$ISM_BASE_URI/change_policy"
const val INDEX_NOT_MANAGED = "This index is not being managed"
Expand Down
Loading

0 comments on commit 421c3ef

Please sign in to comment.