Skip to content

Commit

Permalink
CRUD api for long running operation notification config (#722)
Browse files Browse the repository at this point in the history
* notification crud api

Signed-off-by: zhichao-aws <[email protected]>

* changes based on comments; add tests

Signed-off-by: zhichao-aws <[email protected]>

* changes for comment

Signed-off-by: zhichao-aws <[email protected]>

* change behavior for get action index not exists, fix test cases

Signed-off-by: zhichao-aws <[email protected]>

* fix weak password for changes in security plugin

Signed-off-by: zhichao-aws <[email protected]>

---------

Signed-off-by: zhichao-aws <[email protected]>
  • Loading branch information
zhichao-aws authored May 25, 2023
1 parent 30e16d9 commit 96f7380
Show file tree
Hide file tree
Showing 33 changed files with 2,363 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices
import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.delete.TransportDeleteLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.get.TransportGetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.index.TransportIndexLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestDeleteLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestGetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestIndexLRONConfigAction
import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService
import org.opensearch.indexmanagement.indexstatemanagement.ExtensionStatusChecker
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
Expand Down Expand Up @@ -201,11 +211,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
companion object {
const val PLUGINS_BASE_URI = "/_plugins"
const val ISM_BASE_URI = "$PLUGINS_BASE_URI/_ism"
const val IM_BASE_URI = "$PLUGINS_BASE_URI/_im"
const val ROLLUP_BASE_URI = "$PLUGINS_BASE_URI/_rollup"
const val TRANSFORM_BASE_URI = "$PLUGINS_BASE_URI/_transform"
const val LRON_BASE_URI = "$IM_BASE_URI/lron"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val ROLLUP_JOBS_BASE_URI = "$ROLLUP_BASE_URI/jobs"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val CONTROL_CENTER_INDEX = ".opensearch-control-center"
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"

Expand Down Expand Up @@ -338,7 +351,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestExplainSMPolicyHandler(),
RestDeleteSMPolicyHandler(),
RestCreateSMPolicyHandler(),
RestUpdateSMPolicyHandler()
RestUpdateSMPolicyHandler(),
RestIndexLRONConfigAction(),
RestGetLRONConfigAction(),
RestDeleteLRONConfigAction()
)
}

Expand Down Expand Up @@ -396,6 +412,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerConsumers()
.registerClusterConfigurationProvider(skipFlag)
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
val controlCenterIndices = ControlCenterIndices(client.admin().indices(), clusterService)
actionValidation = ActionValidation(settings, clusterService, jvmService)
val indexStateManagementHistory =
IndexStateManagementHistory(
Expand Down Expand Up @@ -444,6 +461,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
rollupRunner,
transformRunner,
indexManagementIndices,
controlCenterIndices,
actionValidation,
managedIndexCoordinator,
indexStateManagementHistory,
Expand Down Expand Up @@ -567,7 +585,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(SMActions.START_SM_POLICY_ACTION_TYPE, TransportStartSMAction::class.java),
ActionPlugin.ActionHandler(SMActions.STOP_SM_POLICY_ACTION_TYPE, TransportStopSMAction::class.java),
ActionPlugin.ActionHandler(SMActions.EXPLAIN_SM_POLICY_ACTION_TYPE, TransportExplainSMAction::class.java),
ActionPlugin.ActionHandler(SMActions.GET_SM_POLICIES_ACTION_TYPE, TransportGetSMPoliciesAction::class.java)
ActionPlugin.ActionHandler(SMActions.GET_SM_POLICIES_ACTION_TYPE, TransportGetSMPoliciesAction::class.java),
ActionPlugin.ActionHandler(IndexLRONConfigAction.INSTANCE, TransportIndexLRONConfigAction::class.java),
ActionPlugin.ActionHandler(GetLRONConfigAction.INSTANCE, TransportGetLRONConfigAction::class.java),
ActionPlugin.ActionHandler(DeleteLRONConfigAction.INSTANCE, TransportDeleteLRONConfigAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification

import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.indexmanagement.util.IndexUtils

class ControlCenterIndices(
private val client: IndicesAdminClient,
private val clusterService: ClusterService,
) {

fun checkAndUpdateControlCenterIndex(actionListener: ActionListener<AcknowledgedResponse>) {
if (!controlCenterIndexExists()) {
val indexRequest = CreateIndexRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX)
.mapping(controlCenterMappings)
.settings(Settings.builder().put(INDEX_HIDDEN, true).build())
client.create(
indexRequest,
object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
if (e is ResourceAlreadyExistsException) {
/* if two request create the control center index at the same time, may raise this exception */
/* but we don't take it as error */
actionListener.onResponse(
CreateIndexResponse(
true,
true,
IndexManagementPlugin.CONTROL_CENTER_INDEX
)
)
} else actionListener.onFailure(e)
}

override fun onResponse(response: CreateIndexResponse) {
actionListener.onResponse(response)
}
}
)
} else {
IndexUtils.checkAndUpdateIndexMapping(
IndexManagementPlugin.CONTROL_CENTER_INDEX,
IndexUtils.getSchemaVersion(controlCenterMappings),
controlCenterMappings,
clusterService.state(),
client,
actionListener
)
}
}

private fun controlCenterIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(IndexManagementPlugin.CONTROL_CENTER_INDEX)

companion object {
val controlCenterMappings = ControlCenterIndices::class.java.classLoader
.getResource("mappings/opensearch-control-center.json")!!.readText()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification

import org.opensearch.action.ActionResponse
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig
import org.opensearch.indexmanagement.controlcenter.notification.util.WITH_PRIORITY
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER
import org.opensearch.indexmanagement.util._ID
import java.io.IOException

class LRONConfigResponse(
val id: String,
val lronConfig: LRONConfig
) : ActionResponse(), ToXContentObject {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(),
lronConfig = LRONConfig(sin)
)

override fun writeTo(out: StreamOutput) {
out.writeString(id)
lronConfig.writeTo(out)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)

/* drop user info in rest layer. only keep user info in transport layer */
val lronConfigParams = ToXContent.MapParams(mapOf(WITH_TYPE to "false", WITH_USER to "false", WITH_PRIORITY to "false"))
builder.field(LRONConfig.LRON_CONFIG_FIELD, lronConfig, lronConfigParams)

return builder.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.delete

import org.opensearch.action.ActionType
import org.opensearch.action.delete.DeleteResponse

class DeleteLRONConfigAction private constructor() : ActionType<DeleteResponse>(NAME, ::DeleteResponse) {
companion object {
val INSTANCE = DeleteLRONConfigAction()
const val NAME = "cluster:admin/opensearch/controlcenter/lron/delete"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.delete

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.indexmanagement.controlcenter.notification.util.LRON_DOC_ID_PREFIX
import java.io.IOException

class DeleteLRONConfigRequest(
val docId: String
) : ActionRequest() {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
docId = sin.readString()
)

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (!(docId.startsWith(LRON_DOC_ID_PREFIX))) {
validationException = ValidateActions.addValidationError(
"Invalid LRONConfig ID",
validationException
)
}
return validationException
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(docId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.delete

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.delete.DeleteResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.common.inject.Inject
import org.opensearch.commons.ConfigConstants
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

class TransportDeleteLRONConfigAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
actionFilters: ActionFilters
) : HandledTransportAction<DeleteLRONConfigRequest, DeleteResponse>(
DeleteLRONConfigAction.NAME, transportService, actionFilters, ::DeleteLRONConfigRequest
) {
private val log = LogManager.getLogger(javaClass)

override fun doExecute(task: Task, request: DeleteLRONConfigRequest, listener: ActionListener<DeleteResponse>) {
DeleteLRONConfigHandler(client, listener, request).start()
}

inner class DeleteLRONConfigHandler(
private val client: NodeClient,
private val actionListener: ActionListener<DeleteResponse>,
private val request: DeleteLRONConfigRequest,
private val docId: String = request.docId
) {
fun start() {
log.debug(
"User and roles string from thread context: ${
client.threadPool().threadContext.getTransient<String>(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
)
}"
)

client.threadPool().threadContext.stashContext().use {
val deleteRequest = DeleteRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)

client.delete(deleteRequest, actionListener)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.get

import org.opensearch.action.ActionType

class GetLRONConfigAction private constructor() : ActionType<GetLRONConfigResponse>(NAME, ::GetLRONConfigResponse) {
companion object {
val INSTANCE = GetLRONConfigAction()
const val NAME = "cluster:admin/opensearch/controlcenter/lron/get"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.get

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.indexmanagement.common.model.rest.SearchParams
import org.opensearch.indexmanagement.controlcenter.notification.util.LRON_DOC_ID_PREFIX
import java.io.IOException

class GetLRONConfigRequest(
val docId: String? = null,
val searchParams: SearchParams? = null
) : ActionRequest() {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
docId = sin.readOptionalString(),
searchParams = sin.readOptionalWriteable(::SearchParams)
)

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (null == docId && null == searchParams) {
validationException = ValidateActions.addValidationError(
"GetLRONConfigRequest must contain docId or searchParams",
validationException
)
}
if (null != docId && null != searchParams) {
validationException = ValidateActions.addValidationError(
"Get LRONConfig requires either docId or searchParams to be specified",
validationException
)
}
if (null != docId && !docId.startsWith(LRON_DOC_ID_PREFIX)) {
validationException = ValidateActions.addValidationError(
"Invalid LRONConfig ID",
validationException
)
}
return validationException
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeOptionalString(docId)
out.writeOptionalWriteable(searchParams)
}
}
Loading

0 comments on commit 96f7380

Please sign in to comment.