Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

actionify add policy API #298

Merged
merged 2 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.TransportAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.RemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
Expand Down Expand Up @@ -195,7 +197,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
return listOf(
ActionPlugin.ActionHandler(UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java),
ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java),
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java)
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java),
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,18 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FailedIndex
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.UPDATED_INDICES
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.buildInvalidIndexResponse
import org.elasticsearch.ElasticsearchTimeoutException
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyRequest
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.Strings
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.index.Index
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.BytesRestResponse
import org.elasticsearch.rest.RestChannel
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.RestResponse
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestActionListener
import org.elasticsearch.rest.action.RestResponseListener
import org.elasticsearch.rest.action.RestToXContentListener
import java.io.IOException
import java.time.Duration
import java.time.Instant

class RestAddPolicyAction : BaseRestHandler() {

Expand Down Expand Up @@ -79,115 +57,14 @@ class RestAddPolicyAction : BaseRestHandler() {

val policyID = requireNotNull(body.getOrDefault("policy_id", null)) { "Missing policy_id" }

val strictExpandOptions = IndicesOptions.strictExpand()
val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String)

val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*indices)
.metadata(true)
.local(false)
.waitForTimeout(TimeValue.timeValueMillis(ADD_POLICY_TIMEOUT_IN_MILLIS))
.indicesOptions(strictExpandOptions)

val startTime = Instant.now()
return RestChannelConsumer {
client.admin()
.cluster()
.state(clusterStateRequest, AddPolicyHandler(client, it, policyID as String, startTime))
}
}

inner class AddPolicyHandler(
private val client: NodeClient,
channel: RestChannel,
private val policyID: String,
private val startTime: Instant
) : RestActionListener<ClusterStateResponse>(channel) {

private val failedIndices: MutableList<FailedIndex> = mutableListOf()
private val indicesToAddPolicyTo: MutableList<Index> = mutableListOf()

@Suppress("SpreadOperator") // There is no way around dealing with java vararg without spread operator.
override fun processResponse(clusterStateResponse: ClusterStateResponse) {
val state = clusterStateResponse.state
populateLists(state)

val builder = channel.newBuilder().startObject()
if (indicesToAddPolicyTo.isNotEmpty()) {
val timeSinceClusterStateRequest: Duration = Duration.between(startTime, Instant.now())

// Timeout for UpdateSettingsRequest in milliseconds
val updateSettingsTimeout = ADD_POLICY_TIMEOUT_IN_MILLIS - timeSinceClusterStateRequest.toMillis()

// If after the ClusterStateResponse we go over the timeout for Add Policy (30 seconds), throw an
// exception since UpdateSettingsRequest cannot have a negative timeout
if (updateSettingsTimeout < 0) {
throw ElasticsearchTimeoutException("Add policy API timed out after ClusterStateResponse")
}

val updateSettingsRequest = UpdateSettingsRequest()
.indices(*indicesToAddPolicyTo.map { it.name }.toTypedArray())
.settings(Settings.builder().put(ManagedIndexSettings.POLICY_ID.key, policyID))
.timeout(TimeValue.timeValueMillis(updateSettingsTimeout))

try {
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest,
object : RestResponseListener<AcknowledgedResponse>(channel) {
override fun buildResponse(response: AcknowledgedResponse): RestResponse {
if (response.isAcknowledged) {
builder.field(UPDATED_INDICES, indicesToAddPolicyTo.size)
} else {
builder.field(UPDATED_INDICES, 0)
failedIndices.addAll(indicesToAddPolicyTo.map {
FailedIndex(it.name, it.uuid, "Failed to add policy")
})
}

buildInvalidIndexResponse(builder, failedIndices)
return BytesRestResponse(RestStatus.OK, builder.endObject())
}
}
)
} catch (e: ClusterBlockException) {
failedIndices.addAll(indicesToAddPolicyTo.map {
FailedIndex(it.name, it.uuid, "Failed to add policy due to ClusterBlockingException: ${e.message}"
)
})

builder.field(UPDATED_INDICES, 0)
buildInvalidIndexResponse(builder, failedIndices)
channel.sendResponse(BytesRestResponse(RestStatus.OK, builder.endObject()))
}
} else {
builder.field(UPDATED_INDICES, 0)
buildInvalidIndexResponse(builder, failedIndices)
channel.sendResponse(BytesRestResponse(RestStatus.OK, builder.endObject()))
}
}

private fun populateLists(state: ClusterState) {
for (indexMetaDataEntry in state.metadata.indices) {
val indexMetaData = indexMetaDataEntry.value
when {
indexMetaData.getPolicyID() != null ->
failedIndices.add(
FailedIndex(
indexMetaData.index.name,
indexMetaData.index.uuid,
"This index already has a policy, use the update policy API to update index policies"
)
)
indexMetaData.state == IndexMetadata.State.CLOSE ->
failedIndices.add(FailedIndex(indexMetaData.index.name, indexMetaData.index.uuid, "This index is closed"))
else -> indicesToAddPolicyTo.add(indexMetaData.index)
}
}
return RestChannelConsumer { channel ->
client.execute(AddPolicyAction.INSTANCE, addPolicyRequest, RestToXContentListener(channel))
}
}

companion object {
const val ADD_POLICY_BASE_URI = "$ISM_BASE_URI/add"

const val ADD_POLICY_TIMEOUT_IN_MILLIS = 30000L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ open class ISMStatusResponse : ActionResponse, ToXContentObject {
buildInvalidIndexResponse(builder, failedIndices)
return builder.endObject()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy

import org.elasticsearch.test.ESTestCase
import org.junit.Assert
import org.elasticsearch.action.ActionType

class RemovePolicyActionTests : ESTestCase() {

fun `test remove policy action name`() {
Assert.assertNotNull(RemovePolicyAction.INSTANCE.name())
Assert.assertEquals(RemovePolicyAction.INSTANCE.name(), RemovePolicyAction.NAME)
class AddPolicyAction private constructor() : ActionType<AddPolicyResponse>(NAME, ::AddPolicyResponse) {
companion object {
val INSTANCE = AddPolicyAction()
val NAME = "cluster:admin/opendistro/ism/managedindex/add"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy

import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionRequestValidationException
import org.elasticsearch.action.ValidateActions
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import java.io.IOException
import java.util.Collections

class AddPolicyRequest : ActionRequest {

val indices: List<String>
val policyID: String?

constructor(
indices: List<String>,
policyID: String?
) : super() {
this.indices = indices
this.policyID = policyID
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Collections.unmodifiableList(sin.readStringList()),
sin.readString()
)

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (indices.isEmpty() || policyID.isNullOrBlank()) {
validationException = ValidateActions.addValidationError("Missing indices or policyID", validationException)
}
return validationException
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeStringCollection(indices)
out.writeString(policyID)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FailedIndex
import org.elasticsearch.common.io.stream.StreamInput

class AddPolicyResponse : ISMStatusResponse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need a subclass when there is nothing extra being added? Any reason not to just use ISMStatusResponse?

Copy link
Contributor Author

@bowenlan-amzn bowenlan-amzn Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No big reason, just to look orderly, should be easier to read and understand I guess

constructor(updated: Int, failedIndices: List<FailedIndex>) : super(updated, failedIndices)
constructor(sin: StreamInput) : super(sin)
}
Loading