Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRUD api for long running operation notification config #722

Merged
merged 6 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices
import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.delete.TransportDeleteLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.get.TransportGetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.index.TransportIndexLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestDeleteLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestGetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestIndexLRONConfigAction
import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService
import org.opensearch.indexmanagement.indexstatemanagement.ExtensionStatusChecker
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
Expand Down Expand Up @@ -201,11 +211,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
companion object {
const val PLUGINS_BASE_URI = "/_plugins"
const val ISM_BASE_URI = "$PLUGINS_BASE_URI/_ism"
const val IM_BASE_URI = "$PLUGINS_BASE_URI/_im"
const val ROLLUP_BASE_URI = "$PLUGINS_BASE_URI/_rollup"
const val TRANSFORM_BASE_URI = "$PLUGINS_BASE_URI/_transform"
const val LRON_BASE_URI = "$IM_BASE_URI/lron"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val ROLLUP_JOBS_BASE_URI = "$ROLLUP_BASE_URI/jobs"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val CONTROL_CENTER_INDEX = ".opensearch-control-center"
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"

Expand Down Expand Up @@ -338,7 +351,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestExplainSMPolicyHandler(),
RestDeleteSMPolicyHandler(),
RestCreateSMPolicyHandler(),
RestUpdateSMPolicyHandler()
RestUpdateSMPolicyHandler(),
RestIndexLRONConfigAction(),
RestGetLRONConfigAction(),
RestDeleteLRONConfigAction()
)
}

Expand Down Expand Up @@ -396,6 +412,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerConsumers()
.registerClusterConfigurationProvider(skipFlag)
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
val controlCenterIndices = ControlCenterIndices(client.admin().indices(), clusterService)
actionValidation = ActionValidation(settings, clusterService, jvmService)
val indexStateManagementHistory =
IndexStateManagementHistory(
Expand Down Expand Up @@ -444,6 +461,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
rollupRunner,
transformRunner,
indexManagementIndices,
controlCenterIndices,
actionValidation,
managedIndexCoordinator,
indexStateManagementHistory,
Expand Down Expand Up @@ -567,7 +585,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(SMActions.START_SM_POLICY_ACTION_TYPE, TransportStartSMAction::class.java),
ActionPlugin.ActionHandler(SMActions.STOP_SM_POLICY_ACTION_TYPE, TransportStopSMAction::class.java),
ActionPlugin.ActionHandler(SMActions.EXPLAIN_SM_POLICY_ACTION_TYPE, TransportExplainSMAction::class.java),
ActionPlugin.ActionHandler(SMActions.GET_SM_POLICIES_ACTION_TYPE, TransportGetSMPoliciesAction::class.java)
ActionPlugin.ActionHandler(SMActions.GET_SM_POLICIES_ACTION_TYPE, TransportGetSMPoliciesAction::class.java),
ActionPlugin.ActionHandler(IndexLRONConfigAction.INSTANCE, TransportIndexLRONConfigAction::class.java),
ActionPlugin.ActionHandler(GetLRONConfigAction.INSTANCE, TransportGetLRONConfigAction::class.java),
ActionPlugin.ActionHandler(DeleteLRONConfigAction.INSTANCE, TransportDeleteLRONConfigAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification

import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.indexmanagement.util.IndexUtils

class ControlCenterIndices(
private val client: IndicesAdminClient,
private val clusterService: ClusterService,
) {

fun checkAndUpdateControlCenterIndex(actionListener: ActionListener<AcknowledgedResponse>) {
if (!controlCenterIndexExists()) {
val indexRequest = CreateIndexRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX)
.mapping(controlCenterMappings)
.settings(Settings.builder().put(INDEX_HIDDEN, true).build())
client.create(
indexRequest,
object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
if (e is ResourceAlreadyExistsException) {
/* if two request create the control center index at the same time, may raise this exception */
/* but we don't take it as error */
actionListener.onResponse(
CreateIndexResponse(
true,
true,
IndexManagementPlugin.CONTROL_CENTER_INDEX
)
)
} else actionListener.onFailure(e)
}

override fun onResponse(response: CreateIndexResponse) {
actionListener.onResponse(response)
}
}
)
} else {
IndexUtils.checkAndUpdateIndexMapping(
IndexManagementPlugin.CONTROL_CENTER_INDEX,
IndexUtils.getSchemaVersion(controlCenterMappings),
controlCenterMappings,
clusterService.state(),
client,
actionListener
)
}
}

private fun controlCenterIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(IndexManagementPlugin.CONTROL_CENTER_INDEX)

companion object {
val controlCenterMappings = ControlCenterIndices::class.java.classLoader
.getResource("mappings/opensearch-control-center.json")!!.readText()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification

import org.opensearch.action.ActionResponse
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig
import org.opensearch.indexmanagement.controlcenter.notification.util.WITH_PRIORITY
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER
import org.opensearch.indexmanagement.util._ID
import java.io.IOException

class LRONConfigResponse(
val id: String,
val lronConfig: LRONConfig
) : ActionResponse(), ToXContentObject {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(),
lronConfig = LRONConfig(sin)
)

override fun writeTo(out: StreamOutput) {
out.writeString(id)
lronConfig.writeTo(out)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)

/* drop user info in rest layer. only keep user info in transport layer */
val lronConfigParams = ToXContent.MapParams(mapOf(WITH_TYPE to "false", WITH_USER to "false", WITH_PRIORITY to "false"))
builder.field(LRONConfig.LRON_CONFIG_FIELD, lronConfig, lronConfigParams)

return builder.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.delete

import org.opensearch.action.ActionType
import org.opensearch.action.delete.DeleteResponse

class DeleteLRONConfigAction private constructor() : ActionType<DeleteResponse>(NAME, ::DeleteResponse) {
companion object {
val INSTANCE = DeleteLRONConfigAction()
const val NAME = "cluster:admin/opensearch/controlcenter/lron/delete"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.delete

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.indexmanagement.controlcenter.notification.util.LRON_DOC_ID_PREFIX
import java.io.IOException

class DeleteLRONConfigRequest(
val docId: String
) : ActionRequest() {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
docId = sin.readString()
)

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (!(docId.startsWith(LRON_DOC_ID_PREFIX))) {
zhichao-aws marked this conversation as resolved.
Show resolved Hide resolved
validationException = ValidateActions.addValidationError(
"Invalid LRONConfig ID",
validationException
)
}
return validationException
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(docId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.delete

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.delete.DeleteResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.common.inject.Inject
import org.opensearch.commons.ConfigConstants
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

class TransportDeleteLRONConfigAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
actionFilters: ActionFilters
) : HandledTransportAction<DeleteLRONConfigRequest, DeleteResponse>(
DeleteLRONConfigAction.NAME, transportService, actionFilters, ::DeleteLRONConfigRequest
) {
private val log = LogManager.getLogger(javaClass)

override fun doExecute(task: Task, request: DeleteLRONConfigRequest, listener: ActionListener<DeleteResponse>) {
DeleteLRONConfigHandler(client, listener, request).start()
}

inner class DeleteLRONConfigHandler(
private val client: NodeClient,
private val actionListener: ActionListener<DeleteResponse>,
private val request: DeleteLRONConfigRequest,
private val docId: String = request.docId
) {
fun start() {
log.debug(
"User and roles string from thread context: ${
client.threadPool().threadContext.getTransient<String>(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
)
}"
)

client.threadPool().threadContext.stashContext().use {
val deleteRequest = DeleteRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)

client.delete(deleteRequest, actionListener)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.get

import org.opensearch.action.ActionType

class GetLRONConfigAction private constructor() : ActionType<GetLRONConfigResponse>(NAME, ::GetLRONConfigResponse) {
companion object {
val INSTANCE = GetLRONConfigAction()
const val NAME = "cluster:admin/opensearch/controlcenter/lron/get"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.action.get

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.indexmanagement.common.model.rest.SearchParams
import org.opensearch.indexmanagement.controlcenter.notification.util.LRON_DOC_ID_PREFIX
import java.io.IOException

class GetLRONConfigRequest(
val docId: String? = null,
val searchParams: SearchParams? = null
) : ActionRequest() {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
docId = sin.readOptionalString(),
searchParams = sin.readOptionalWriteable(::SearchParams)
)

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (null == docId && null == searchParams) {
validationException = ValidateActions.addValidationError(
"GetLRONConfigRequest must contain docId or searchParams",
validationException
)
}
if (null != docId && null != searchParams) {
validationException = ValidateActions.addValidationError(
"Get LRONConfig requires either docId or searchParams to be specified",
validationException
)
}
if (null != docId && !docId.startsWith(LRON_DOC_ID_PREFIX)) {
validationException = ValidateActions.addValidationError(
"Invalid LRONConfig ID",
validationException
)
}
return validationException
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeOptionalString(docId)
out.writeOptionalWriteable(searchParams)
}
}
Loading