Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
actionify add policy API (#298)
Browse files Browse the repository at this point in the history
* actionify add policy API
  • Loading branch information
bowenlan-amzn authored Oct 7, 2020
1 parent b240a15 commit 4df177b
Show file tree
Hide file tree
Showing 13 changed files with 390 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.TransportAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.RemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
Expand Down Expand Up @@ -195,7 +197,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
return listOf(
ActionPlugin.ActionHandler(UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java),
ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java),
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java)
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java),
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,18 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FailedIndex
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.UPDATED_INDICES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.buildInvalidIndexResponse
import org.elasticsearch.ElasticsearchTimeoutException
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyRequest
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.Strings
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.index.Index
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.BytesRestResponse
import org.elasticsearch.rest.RestChannel
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.RestResponse
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestActionListener
import org.elasticsearch.rest.action.RestResponseListener
import org.elasticsearch.rest.action.RestToXContentListener
import java.io.IOException
import java.time.Duration
import java.time.Instant

class RestAddPolicyAction : BaseRestHandler() {

Expand Down Expand Up @@ -79,115 +57,14 @@ class RestAddPolicyAction : BaseRestHandler() {

val policyID = requireNotNull(body.getOrDefault("policy_id", null)) { "Missing policy_id" }

val strictExpandOptions = IndicesOptions.strictExpand()
val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String)

val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*indices)
.metadata(true)
.local(false)
.waitForTimeout(TimeValue.timeValueMillis(ADD_POLICY_TIMEOUT_IN_MILLIS))
.indicesOptions(strictExpandOptions)

val startTime = Instant.now()
return RestChannelConsumer {
client.admin()
.cluster()
.state(clusterStateRequest, AddPolicyHandler(client, it, policyID as String, startTime))
}
}

inner class AddPolicyHandler(
private val client: NodeClient,
channel: RestChannel,
private val policyID: String,
private val startTime: Instant
) : RestActionListener<ClusterStateResponse>(channel) {

private val failedIndices: MutableList<FailedIndex> = mutableListOf()
private val indicesToAddPolicyTo: MutableList<Index> = mutableListOf()

@Suppress("SpreadOperator") // There is no way around dealing with java vararg without spread operator.
override fun processResponse(clusterStateResponse: ClusterStateResponse) {
val state = clusterStateResponse.state
populateLists(state)

val builder = channel.newBuilder().startObject()
if (indicesToAddPolicyTo.isNotEmpty()) {
val timeSinceClusterStateRequest: Duration = Duration.between(startTime, Instant.now())

// Timeout for UpdateSettingsRequest in milliseconds
val updateSettingsTimeout = ADD_POLICY_TIMEOUT_IN_MILLIS - timeSinceClusterStateRequest.toMillis()

// If after the ClusterStateResponse we go over the timeout for Add Policy (30 seconds), throw an
// exception since UpdateSettingsRequest cannot have a negative timeout
if (updateSettingsTimeout < 0) {
throw ElasticsearchTimeoutException("Add policy API timed out after ClusterStateResponse")
}

val updateSettingsRequest = UpdateSettingsRequest()
.indices(*indicesToAddPolicyTo.map { it.name }.toTypedArray())
.settings(Settings.builder().put(ManagedIndexSettings.POLICY_ID.key, policyID))
.timeout(TimeValue.timeValueMillis(updateSettingsTimeout))

try {
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest,
object : RestResponseListener<AcknowledgedResponse>(channel) {
override fun buildResponse(response: AcknowledgedResponse): RestResponse {
if (response.isAcknowledged) {
builder.field(UPDATED_INDICES, indicesToAddPolicyTo.size)
} else {
builder.field(UPDATED_INDICES, 0)
failedIndices.addAll(indicesToAddPolicyTo.map {
FailedIndex(it.name, it.uuid, "Failed to add policy")
})
}

buildInvalidIndexResponse(builder, failedIndices)
return BytesRestResponse(RestStatus.OK, builder.endObject())
}
}
)
} catch (e: ClusterBlockException) {
failedIndices.addAll(indicesToAddPolicyTo.map {
FailedIndex(it.name, it.uuid, "Failed to add policy due to ClusterBlockingException: ${e.message}"
)
})

builder.field(UPDATED_INDICES, 0)
buildInvalidIndexResponse(builder, failedIndices)
channel.sendResponse(BytesRestResponse(RestStatus.OK, builder.endObject()))
}
} else {
builder.field(UPDATED_INDICES, 0)
buildInvalidIndexResponse(builder, failedIndices)
channel.sendResponse(BytesRestResponse(RestStatus.OK, builder.endObject()))
}
}

private fun populateLists(state: ClusterState) {
for (indexMetaDataEntry in state.metadata.indices) {
val indexMetaData = indexMetaDataEntry.value
when {
indexMetaData.getPolicyID() != null ->
failedIndices.add(
FailedIndex(
indexMetaData.index.name,
indexMetaData.index.uuid,
"This index already has a policy, use the update policy API to update index policies"
)
)
indexMetaData.state == IndexMetadata.State.CLOSE ->
failedIndices.add(FailedIndex(indexMetaData.index.name, indexMetaData.index.uuid, "This index is closed"))
else -> indicesToAddPolicyTo.add(indexMetaData.index)
}
}
return RestChannelConsumer { channel ->
client.execute(AddPolicyAction.INSTANCE, addPolicyRequest, RestToXContentListener(channel))
}
}

companion object {
const val ADD_POLICY_BASE_URI = "$ISM_BASE_URI/add"

const val ADD_POLICY_TIMEOUT_IN_MILLIS = 30000L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ open class ISMStatusResponse : ActionResponse, ToXContentObject {
buildInvalidIndexResponse(builder, failedIndices)
return builder.endObject()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy

import org.elasticsearch.test.ESTestCase
import org.junit.Assert
import org.elasticsearch.action.ActionType

class RemovePolicyActionTests : ESTestCase() {

fun `test remove policy action name`() {
Assert.assertNotNull(RemovePolicyAction.INSTANCE.name())
Assert.assertEquals(RemovePolicyAction.INSTANCE.name(), RemovePolicyAction.NAME)
class AddPolicyAction private constructor() : ActionType<AddPolicyResponse>(NAME, ::AddPolicyResponse) {
companion object {
val INSTANCE = AddPolicyAction()
val NAME = "cluster:admin/opendistro/ism/managedindex/add"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy

import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionRequestValidationException
import org.elasticsearch.action.ValidateActions
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import java.io.IOException
import java.util.Collections

class AddPolicyRequest : ActionRequest {

val indices: List<String>
val policyID: String?

constructor(
indices: List<String>,
policyID: String?
) : super() {
this.indices = indices
this.policyID = policyID
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Collections.unmodifiableList(sin.readStringList()),
sin.readString()
)

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (indices.isEmpty() || policyID.isNullOrBlank()) {
validationException = ValidateActions.addValidationError("Missing indices or policyID", validationException)
}
return validationException
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeStringCollection(indices)
out.writeString(policyID)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FailedIndex
import org.elasticsearch.common.io.stream.StreamInput

class AddPolicyResponse : ISMStatusResponse {
constructor(updated: Int, failedIndices: List<FailedIndex>) : super(updated, failedIndices)
constructor(sin: StreamInput) : super(sin)
}
Loading

0 comments on commit 4df177b

Please sign in to comment.