diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 7f15a0056..3180559e9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -29,6 +29,16 @@ import org.opensearch.core.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment +import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices +import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.delete.TransportDeleteLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.get.TransportGetLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.index.TransportIndexLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestDeleteLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestGetLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestIndexLRONConfigAction import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService import org.opensearch.indexmanagement.indexstatemanagement.ExtensionStatusChecker import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser @@ -201,11 +211,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin companion object { const val PLUGINS_BASE_URI = "/_plugins" const val ISM_BASE_URI = "$PLUGINS_BASE_URI/_ism" + const val IM_BASE_URI = "$PLUGINS_BASE_URI/_im" const val ROLLUP_BASE_URI = "$PLUGINS_BASE_URI/_rollup" const val TRANSFORM_BASE_URI = "$PLUGINS_BASE_URI/_transform" + const val LRON_BASE_URI = "$IM_BASE_URI/lron" const val POLICY_BASE_URI = "$ISM_BASE_URI/policies" const val ROLLUP_JOBS_BASE_URI = "$ROLLUP_BASE_URI/jobs" const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config" + const val CONTROL_CENTER_INDEX = ".opensearch-control-center" const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management" const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data" @@ -338,7 +351,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RestExplainSMPolicyHandler(), RestDeleteSMPolicyHandler(), RestCreateSMPolicyHandler(), - RestUpdateSMPolicyHandler() + RestUpdateSMPolicyHandler(), + RestIndexLRONConfigAction(), + RestGetLRONConfigAction(), + RestDeleteLRONConfigAction() ) } @@ -396,6 +412,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin .registerConsumers() .registerClusterConfigurationProvider(skipFlag) indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService) + val controlCenterIndices = ControlCenterIndices(client.admin().indices(), clusterService) actionValidation = ActionValidation(settings, clusterService, jvmService) val indexStateManagementHistory = IndexStateManagementHistory( @@ -444,6 +461,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin rollupRunner, transformRunner, indexManagementIndices, + controlCenterIndices, actionValidation, managedIndexCoordinator, indexStateManagementHistory, @@ -567,7 +585,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ActionPlugin.ActionHandler(SMActions.START_SM_POLICY_ACTION_TYPE, TransportStartSMAction::class.java), ActionPlugin.ActionHandler(SMActions.STOP_SM_POLICY_ACTION_TYPE, TransportStopSMAction::class.java), ActionPlugin.ActionHandler(SMActions.EXPLAIN_SM_POLICY_ACTION_TYPE, TransportExplainSMAction::class.java), - ActionPlugin.ActionHandler(SMActions.GET_SM_POLICIES_ACTION_TYPE, TransportGetSMPoliciesAction::class.java) + ActionPlugin.ActionHandler(SMActions.GET_SM_POLICIES_ACTION_TYPE, TransportGetSMPoliciesAction::class.java), + ActionPlugin.ActionHandler(IndexLRONConfigAction.INSTANCE, TransportIndexLRONConfigAction::class.java), + ActionPlugin.ActionHandler(GetLRONConfigAction.INSTANCE, TransportGetLRONConfigAction::class.java), + ActionPlugin.ActionHandler(DeleteLRONConfigAction.INSTANCE, TransportDeleteLRONConfigAction::class.java) ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt new file mode 100644 index 000000000..dbb68a4b2 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification + +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.IndicesAdminClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN +import org.opensearch.indexmanagement.util.IndexUtils + +class ControlCenterIndices( + private val client: IndicesAdminClient, + private val clusterService: ClusterService, +) { + + fun checkAndUpdateControlCenterIndex(actionListener: ActionListener) { + if (!controlCenterIndexExists()) { + val indexRequest = CreateIndexRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX) + .mapping(controlCenterMappings) + .settings(Settings.builder().put(INDEX_HIDDEN, true).build()) + client.create( + indexRequest, + object : ActionListener { + override fun onFailure(e: Exception) { + if (e is ResourceAlreadyExistsException) { + /* if two request create the control center index at the same time, may raise this exception */ + /* but we don't take it as error */ + actionListener.onResponse( + CreateIndexResponse( + true, + true, + IndexManagementPlugin.CONTROL_CENTER_INDEX + ) + ) + } else actionListener.onFailure(e) + } + + override fun onResponse(response: CreateIndexResponse) { + actionListener.onResponse(response) + } + } + ) + } else { + IndexUtils.checkAndUpdateIndexMapping( + IndexManagementPlugin.CONTROL_CENTER_INDEX, + IndexUtils.getSchemaVersion(controlCenterMappings), + controlCenterMappings, + clusterService.state(), + client, + actionListener + ) + } + } + + private fun controlCenterIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(IndexManagementPlugin.CONTROL_CENTER_INDEX) + + companion object { + val controlCenterMappings = ControlCenterIndices::class.java.classLoader + .getResource("mappings/opensearch-control-center.json")!!.readText() + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/LRONConfigResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/LRONConfigResponse.kt new file mode 100644 index 000000000..bc5a69f34 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/LRONConfigResponse.kt @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification + +import org.opensearch.action.ActionResponse +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.util.WITH_PRIORITY +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER +import org.opensearch.indexmanagement.util._ID +import java.io.IOException + +class LRONConfigResponse( + val id: String, + val lronConfig: LRONConfig +) : ActionResponse(), ToXContentObject { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + lronConfig = LRONConfig(sin) + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + lronConfig.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(_ID, id) + + /* drop user info in rest layer. only keep user info in transport layer */ + val lronConfigParams = ToXContent.MapParams(mapOf(WITH_TYPE to "false", WITH_USER to "false", WITH_PRIORITY to "false")) + builder.field(LRONConfig.LRON_CONFIG_FIELD, lronConfig, lronConfigParams) + + return builder.endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/DeleteLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/DeleteLRONConfigAction.kt new file mode 100644 index 000000000..665487620 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/DeleteLRONConfigAction.kt @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.delete + +import org.opensearch.action.ActionType +import org.opensearch.action.delete.DeleteResponse + +class DeleteLRONConfigAction private constructor() : ActionType(NAME, ::DeleteResponse) { + companion object { + val INSTANCE = DeleteLRONConfigAction() + const val NAME = "cluster:admin/opensearch/controlcenter/lron/delete" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/DeleteLRONConfigRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/DeleteLRONConfigRequest.kt new file mode 100644 index 000000000..b133ae13f --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/DeleteLRONConfigRequest.kt @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.delete + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.indexmanagement.controlcenter.notification.util.LRON_DOC_ID_PREFIX +import java.io.IOException + +class DeleteLRONConfigRequest( + val docId: String +) : ActionRequest() { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + docId = sin.readString() + ) + + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (!(docId.startsWith(LRON_DOC_ID_PREFIX))) { + validationException = ValidateActions.addValidationError( + "Invalid LRONConfig ID", + validationException + ) + } + return validationException + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(docId) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/TransportDeleteLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/TransportDeleteLRONConfigAction.kt new file mode 100644 index 000000000..9e132ec34 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/TransportDeleteLRONConfigAction.kt @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.delete + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.ActionListener +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.client.node.NodeClient +import org.opensearch.common.inject.Inject +import org.opensearch.commons.ConfigConstants +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +class TransportDeleteLRONConfigAction @Inject constructor( + val client: NodeClient, + transportService: TransportService, + actionFilters: ActionFilters +) : HandledTransportAction( + DeleteLRONConfigAction.NAME, transportService, actionFilters, ::DeleteLRONConfigRequest +) { + private val log = LogManager.getLogger(javaClass) + + override fun doExecute(task: Task, request: DeleteLRONConfigRequest, listener: ActionListener) { + DeleteLRONConfigHandler(client, listener, request).start() + } + + inner class DeleteLRONConfigHandler( + private val client: NodeClient, + private val actionListener: ActionListener, + private val request: DeleteLRONConfigRequest, + private val docId: String = request.docId + ) { + fun start() { + log.debug( + "User and roles string from thread context: ${ + client.threadPool().threadContext.getTransient( + ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT + ) + }" + ) + + client.threadPool().threadContext.stashContext().use { + val deleteRequest = DeleteRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + + client.delete(deleteRequest, actionListener) + } + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigAction.kt new file mode 100644 index 000000000..ba966c543 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigAction.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.get + +import org.opensearch.action.ActionType + +class GetLRONConfigAction private constructor() : ActionType(NAME, ::GetLRONConfigResponse) { + companion object { + val INSTANCE = GetLRONConfigAction() + const val NAME = "cluster:admin/opensearch/controlcenter/lron/get" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigRequest.kt new file mode 100644 index 000000000..4b7c44297 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigRequest.kt @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.get + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.indexmanagement.common.model.rest.SearchParams +import org.opensearch.indexmanagement.controlcenter.notification.util.LRON_DOC_ID_PREFIX +import java.io.IOException + +class GetLRONConfigRequest( + val docId: String? = null, + val searchParams: SearchParams? = null +) : ActionRequest() { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + docId = sin.readOptionalString(), + searchParams = sin.readOptionalWriteable(::SearchParams) + ) + + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (null == docId && null == searchParams) { + validationException = ValidateActions.addValidationError( + "GetLRONConfigRequest must contain docId or searchParams", + validationException + ) + } + if (null != docId && null != searchParams) { + validationException = ValidateActions.addValidationError( + "Get LRONConfig requires either docId or searchParams to be specified", + validationException + ) + } + if (null != docId && !docId.startsWith(LRON_DOC_ID_PREFIX)) { + validationException = ValidateActions.addValidationError( + "Invalid LRONConfig ID", + validationException + ) + } + return validationException + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeOptionalString(docId) + out.writeOptionalWriteable(searchParams) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigResponse.kt new file mode 100644 index 000000000..4519b4bd5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/GetLRONConfigResponse.kt @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.get + +import org.opensearch.action.ActionResponse +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig.Companion.LRON_CONFIG_FIELDS +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER +import java.io.IOException + +class GetLRONConfigResponse( + val lronConfigResponses: List, + val totalNumber: Int, +) : ActionResponse(), ToXContentObject { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + lronConfigResponses = sin.readList(::LRONConfigResponse), + totalNumber = sin.readInt(), + ) + + override fun writeTo(out: StreamOutput) { + out.writeList(lronConfigResponses) + out.writeInt(totalNumber) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + val lronConfigParams = ToXContent.MapParams(mapOf(WITH_TYPE to "false", WITH_USER to "false")) + + return builder.startObject() + .startArray(LRON_CONFIG_FIELDS) + .also { lronConfigResponses.forEach { lronConfigResponse -> lronConfigResponse.toXContent(it, lronConfigParams) } } + .endArray() + .field("total_number", totalNumber) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/TransportGetLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/TransportGetLRONConfigAction.kt new file mode 100644 index 000000000..aea80dbd8 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/TransportGetLRONConfigAction.kt @@ -0,0 +1,125 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.get + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.action.ActionListener +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.client.node.NodeClient +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.commons.ConfigConstants +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.query.QueryBuilders +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.util.getLRONConfigAndParse +import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +class TransportGetLRONConfigAction @Inject constructor( + val client: NodeClient, + transportService: TransportService, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, +) : HandledTransportAction( + GetLRONConfigAction.NAME, transportService, actionFilters, ::GetLRONConfigRequest +) { + private val log = LogManager.getLogger(javaClass) + + override fun doExecute(task: Task, request: GetLRONConfigRequest, listener: ActionListener) { + GetLRONConfigHandler(client, listener, request).start() + } + + inner class GetLRONConfigHandler( + private val client: NodeClient, + private val actionListener: ActionListener, + private val request: GetLRONConfigRequest + ) { + fun start() { + log.debug( + "User and roles string from thread context: ${client.threadPool().threadContext.getTransient( + ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT + )}" + ) + client.threadPool().threadContext.stashContext().use { + if (null != request.docId) { + getLRONConfigAndParse( + client, + request.docId, + xContentRegistry, + object : ActionListener { + override fun onResponse(response: LRONConfigResponse) { + actionListener.onResponse(GetLRONConfigResponse(listOf(response), 1)) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + } + ) + } else { + doSearch() + } + } + } + + private fun doSearch() { + val params = request.searchParams + val sortBuilder = params!!.getSortBuilder() + val queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("lron_config")) + .must(QueryBuilders.queryStringQuery(params.queryString)) + + val searchSourceBuilder = SearchSourceBuilder() + .query(queryBuilder) + .sort(sortBuilder) + .from(params.from) + .size(params.size) + + val searchRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(IndexManagementPlugin.CONTROL_CENTER_INDEX) + + client.search( + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + val totalNumber = response.hits.totalHits?.value ?: 0 + val lronConfigResponses = response.hits.hits.map { + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, it.sourceAsString) + LRONConfigResponse( + id = it.id, + lronConfig = xcp.parseWithType(id = it.id, parse = LRONConfig.Companion::parse) + ) + } + actionListener.onResponse(GetLRONConfigResponse(lronConfigResponses, totalNumber.toInt())) + } + + override fun onFailure(e: Exception) { + if (e is IndexNotFoundException) { + // config index hasn't been initialized, catch this here and show empty result + actionListener.onResponse(GetLRONConfigResponse(emptyList(), 0)) + return + } + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + } + ) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/IndexLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/IndexLRONConfigAction.kt new file mode 100644 index 000000000..ae3f9c6e3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/IndexLRONConfigAction.kt @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.index + +import org.opensearch.action.ActionType +import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse + +class IndexLRONConfigAction private constructor() : + ActionType(NAME, ::LRONConfigResponse) { + companion object { + val INSTANCE = IndexLRONConfigAction() + const val NAME = "cluster:admin/opensearch/controlcenter/lron/write" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/IndexLRONConfigRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/IndexLRONConfigRequest.kt new file mode 100644 index 000000000..2dbc24889 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/IndexLRONConfigRequest.kt @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.index + +import org.opensearch.action.ActionRequest +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import java.io.IOException + +class IndexLRONConfigRequest( + val lronConfig: LRONConfig, + val isUpdate: Boolean = false, + val dryRun: Boolean = false +) : ActionRequest() { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + lronConfig = LRONConfig(sin), + isUpdate = sin.readBoolean(), + dryRun = sin.readBoolean() + ) + + override fun validate() = null + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + lronConfig.writeTo(out) + out.writeBoolean(isUpdate) + out.writeBoolean(dryRun) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/TransportIndexLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/TransportIndexLRONConfigAction.kt new file mode 100644 index 000000000..5ea8984e2 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/TransportIndexLRONConfigAction.kt @@ -0,0 +1,142 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action.index + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.authuser.User +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices +import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse +import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID +import org.opensearch.indexmanagement.controlcenter.notification.util.getPriority +import org.opensearch.indexmanagement.util.SecurityUtils +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +@Suppress("LongParameterList") +class TransportIndexLRONConfigAction @Inject constructor( + val client: NodeClient, + transportService: TransportService, + actionFilters: ActionFilters, + val clusterService: ClusterService, + val controlCenterIndices: ControlCenterIndices, + val xContentRegistry: NamedXContentRegistry, +) : HandledTransportAction( + IndexLRONConfigAction.NAME, transportService, actionFilters, ::IndexLRONConfigRequest +) { + private val log = LogManager.getLogger(javaClass) + + override fun doExecute(task: Task, request: IndexLRONConfigRequest, listener: ActionListener) { + IndexLRONConfigHandler(client, listener, request).start() + } + + inner class IndexLRONConfigHandler( + private val client: NodeClient, + private val actionListener: ActionListener, + private val request: IndexLRONConfigRequest, + private val user: User? = SecurityUtils.buildUser(client.threadPool().threadContext), + private val docId: String = getDocID(request.lronConfig.taskId, request.lronConfig.actionName) + ) { + fun start() { + log.debug( + "User and roles string from thread context: ${client.threadPool().threadContext.getTransient( + ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT + )}" + ) + client.threadPool().threadContext.stashContext().use { + /* we use dryRun to help check permission and do request validation */ + if (request.dryRun) { + validate() + return + } + controlCenterIndices.checkAndUpdateControlCenterIndex( + ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure) + ) + } + } + + private fun onCreateMappingsResponse(response: AcknowledgedResponse) { + if (response.isAcknowledged) { + log.info("Successfully created or updated ${IndexManagementPlugin.CONTROL_CENTER_INDEX} with newest mappings.") + validate() + } else { + val message = "Unable to create or update ${IndexManagementPlugin.CONTROL_CENTER_INDEX} with newest mapping." + log.error(message) + actionListener.onFailure(OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)) + } + } + + private fun validate() { + /* check whether the node id in task id exists */ + if (null != request.lronConfig.taskId && null == clusterService.state().nodes.get(request.lronConfig.taskId.nodeId)) { + actionListener.onFailure(IllegalArgumentException("Illegal taskID. NodeID not exists.")) + return + } + putLRONConfig() + } + + private fun putLRONConfig() { + val lronConfig = request.lronConfig.copy( + user = this.user, + priority = getPriority(request.lronConfig.taskId, request.lronConfig.actionName) + ) + + if (request.dryRun) { + actionListener.onResponse(LRONConfigResponse(docId, lronConfig)) + return + } + + val indexRequest = IndexRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(lronConfig.toXContent(XContentFactory.jsonBuilder())) + .id(docId) + .timeout(IndexRequest.DEFAULT_TIMEOUT) + if (!request.isUpdate) { + indexRequest.opType(DocWriteRequest.OpType.CREATE) + } + + client.index( + indexRequest, + object : ActionListener { + override fun onResponse(response: IndexResponse) { + if (response.shardInfo.failed > 0) { + val failureReasons = response.shardInfo.failures.joinToString(",") { it.reason() } + actionListener.onFailure(OpenSearchStatusException(failureReasons, response.status())) + } else { + actionListener.onResponse( + LRONConfigResponse( + response.id, + lronConfig + ) + ) + } + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + } + ) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/model/LRONCondition.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/model/LRONCondition.kt new file mode 100644 index 000000000..cfc2e5386 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/model/LRONCondition.kt @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.util.NO_ID +import java.io.IOException + +data class LRONCondition( + val success: Boolean = DEFAULT_ENABLED, + val failure: Boolean = DEFAULT_ENABLED +) : ToXContentObject, Writeable { + + fun toXContent(builder: XContentBuilder): XContentBuilder { + return toXContent(builder, ToXContent.EMPTY_PARAMS) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(SUCCESS_FIELD, success) + .field(FAILURE_FIELD, failure) + .endObject() + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + success = sin.readBoolean(), + failure = sin.readBoolean() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeBoolean(success) + out.writeBoolean(failure) + } + + fun isEnabled(): Boolean { + return success || failure + } + + companion object { + const val SUCCESS_FIELD = "success" + const val FAILURE_FIELD = "failure" + const val LRON_CONDITION_FIELD = "lron_condition" + private const val DEFAULT_ENABLED = true + + @JvmStatic + @Throws(IOException::class) + @Suppress("UNUSED_PARAMETER") + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): LRONCondition { + return parse(xcp) + } + + @JvmStatic + @Suppress("MaxLineLength", "ComplexMethod", "NestedBlockDepth") + @Throws(IOException::class) + fun parse(xcp: XContentParser): LRONCondition { + var success: Boolean = DEFAULT_ENABLED + var failure: Boolean = DEFAULT_ENABLED + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SUCCESS_FIELD -> success = xcp.booleanValue() + FAILURE_FIELD -> failure = xcp.booleanValue() + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in LRONCondition.") + } + } + + return LRONCondition( + success = success, + failure = failure + ) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/model/LRONConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/model/LRONConfig.kt new file mode 100644 index 000000000..3808fe516 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/model/LRONConfig.kt @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.commons.authuser.User +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.controlcenter.notification.util.WITH_PRIORITY +import org.opensearch.indexmanagement.common.model.notification.Channel +import org.opensearch.indexmanagement.controlcenter.notification.util.validateTaskIdAndActionName +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER +import org.opensearch.indexmanagement.opensearchapi.optionalUserField +import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.tasks.TaskId +import java.io.IOException + +data class LRONConfig( + val lronCondition: LRONCondition, + val taskId: TaskId?, + val actionName: String?, + val channels: List?, + val user: User?, + val priority: Int? +) : ToXContentObject, Writeable { + init { + validateTaskIdAndActionName(taskId, actionName) + if (lronCondition.isEnabled()) { + require(!channels.isNullOrEmpty()) { "Enabled LRONConfig must contain at least one channel" } + } + } + + fun toXContent(builder: XContentBuilder): XContentBuilder { + return toXContent(builder, ToXContent.EMPTY_PARAMS) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean(WITH_TYPE, true)) builder.startObject(LRON_CONFIG_FIELD) + builder.field(LRONCondition.LRON_CONDITION_FIELD, lronCondition) + if (null != taskId) builder.field(TASK_ID_FIELD, taskId.toString()) + if (null != actionName) builder.field(ACTION_NAME_FIELD, actionName) + if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user) + if (null != channels) { + builder.startArray(CHANNELS_FIELD) + .also { channels.forEach { channel -> channel.toXContent(it, params) } } + .endArray() + } + if (params.paramAsBoolean(WITH_PRIORITY, true)) builder.field(PRIORITY_FIELD, priority) + if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() + return builder.endObject() + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + lronCondition = LRONCondition(sin), + taskId = if (sin.readBoolean()) { + TaskId(sin.readString()) + } else null, + actionName = sin.readOptionalString(), + channels = if (sin.readBoolean()) { + sin.readList(::Channel) + } else null, + user = sin.readOptionalWriteable(::User), + priority = sin.readOptionalInt() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + lronCondition.writeTo(out) + if (null != taskId) { + out.writeBoolean(true) + out.writeString(taskId.toString()) + } else out.writeBoolean(false) + out.writeOptionalString(actionName) + if (null != channels) { + out.writeBoolean(true) + out.writeList(channels) + } else out.writeBoolean(false) + out.writeOptionalWriteable(user) + out.writeOptionalInt(priority) + } + + companion object { + const val LRON_CONFIG_FIELD = "lron_config" + const val LRON_CONFIG_FIELDS = "lron_configs" + const val TASK_ID_FIELD = "task_id" + const val ACTION_NAME_FIELD = "action_name" + const val CHANNELS_FIELD = "channels" + const val USER_FIELD = "user" + const val PRIORITY_FIELD = "priority" + const val CHANNEL_TITLE = "Long Running Operation Notification" + + /* to fit with ISM XContentParser.parseWithType function */ + @JvmStatic + @Throws(IOException::class) + @Suppress("UNUSED_PARAMETER") + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): LRONConfig { + return parse(xcp) + } + + @JvmStatic + @Suppress("MaxLineLength", "ComplexMethod", "NestedBlockDepth") + @Throws(IOException::class) + fun parse(xcp: XContentParser): LRONConfig { + var lronCondition = LRONCondition() + var taskId: TaskId? = null + var actionName: String? = null + var channels: List? = null + var user: User? = null + var priority: Int? = null + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + LRONCondition.LRON_CONDITION_FIELD -> lronCondition = LRONCondition.parse(xcp) + TASK_ID_FIELD -> + taskId = + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else TaskId(xcp.text()) + ACTION_NAME_FIELD -> + actionName = + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() + CHANNELS_FIELD -> { + if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) { + channels = mutableListOf() + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + channels.add(Channel.parse(xcp)) + } + } + } + USER_FIELD -> user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) + PRIORITY_FIELD -> priority = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.intValue() + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in LRONConfig.") + } + } + + return LRONConfig( + lronCondition = lronCondition, + taskId = taskId, + actionName = actionName, + channels = channels, + user = user, + priority = priority + ) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigAction.kt new file mode 100644 index 000000000..f24e243de --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigAction.kt @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.resthandler + +import org.opensearch.client.node.NodeClient +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +class RestDeleteLRONConfigAction : BaseRestHandler() { + override fun routes(): List { + return listOf( + RestHandler.Route(RestRequest.Method.DELETE, "${IndexManagementPlugin.LRON_BASE_URI}/{id}") + ) + } + + override fun getName(): String { + return "delete_lron_config_action" + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val docId = request.param("id") + val deleteLRONConfigRequest = DeleteLRONConfigRequest(docId) + + return RestChannelConsumer { channel -> + client.execute(DeleteLRONConfigAction.INSTANCE, deleteLRONConfigRequest, RestToXContentListener(channel)) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigAction.kt new file mode 100644 index 000000000..e38825aae --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigAction.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.resthandler +import org.opensearch.client.node.NodeClient +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigRequest +import org.opensearch.indexmanagement.controlcenter.notification.util.DEFAULT_LRON_CONFIG_SORT_FIELD +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.RestRequest +import org.opensearch.indexmanagement.util.getSearchParams +import org.opensearch.rest.RestHandler +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +class RestGetLRONConfigAction : BaseRestHandler() { + override fun routes(): List { + return listOf( + RestHandler.Route(RestRequest.Method.GET, IndexManagementPlugin.LRON_BASE_URI), + RestHandler.Route(RestRequest.Method.GET, "${IndexManagementPlugin.LRON_BASE_URI}/{id}") + ) + } + + override fun getName(): String { + return "get_lron_config_action" + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val docId = request.param("id") + val searchParams = if (null == docId) request.getSearchParams(DEFAULT_LRON_CONFIG_SORT_FIELD) else null + val getLRONConfigRequest = GetLRONConfigRequest(docId, searchParams) + + return RestChannelConsumer { channel -> + client.execute(GetLRONConfigAction.INSTANCE, getLRONConfigRequest, RestToXContentListener(channel)) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigAction.kt new file mode 100644 index 000000000..b7e0588be --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigAction.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.resthandler + +import org.opensearch.client.node.NodeClient +import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigRequest +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID +import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.util.DRY_RUN +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +class RestIndexLRONConfigAction : BaseRestHandler() { + + override fun routes(): List { + return listOf( + RestHandler.Route(RestRequest.Method.POST, IndexManagementPlugin.LRON_BASE_URI), + RestHandler.Route(RestRequest.Method.PUT, "${IndexManagementPlugin.LRON_BASE_URI}/{id}") + ) + } + + override fun getName(): String { + return "create_lron_config_action" + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val docId = request.param("id") + val dryRun = request.paramAsBoolean(DRY_RUN, false) + val xcp = request.contentParser() + val lronConfig = xcp.parseWithType(parse = LRONConfig.Companion::parse) + val isUpdate = null != docId + if (isUpdate && getDocID(lronConfig.taskId, lronConfig.actionName) != docId) { + throw IllegalArgumentException("docId isn't match with lron_config") + } + + val indexLRONConfigRequest = IndexLRONConfigRequest(lronConfig, isUpdate, dryRun) + + return RestChannelConsumer { channel -> + client.execute(IndexLRONConfigAction.INSTANCE, indexLRONConfigRequest, RestToXContentListener(channel)) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtils.kt new file mode 100644 index 000000000..c4992d3f3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtils.kt @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +@file:JvmName("LRONUtils") +package org.opensearch.indexmanagement.controlcenter.notification.util + +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.forcemerge.ForceMergeAction +import org.opensearch.action.admin.indices.open.OpenIndexAction +import org.opensearch.action.admin.indices.shrink.ResizeAction +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.index.reindex.ReindexAction +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.TaskId + +const val LRON_DOC_ID_PREFIX = "LRON:" + +const val WITH_PRIORITY = "with_priority" +const val PRIORITY_TASK_ID = 200 +const val PRIORITY_DEFAULT_ACTION = 100 +const val DEFAULT_LRON_CONFIG_SORT_FIELD = "lron_config.priority" + +val supportedActions = setOf( + ReindexAction.NAME, + ResizeAction.NAME, + ForceMergeAction.NAME, + OpenIndexAction.NAME +) + +fun validateTaskIdAndActionName(taskId: TaskId?, actionName: String?) { + require(null != actionName || null != taskId) { "LRONConfig must contain taskID or actionName" } + validateActionName(actionName) +} + +fun validateActionName(actionName: String?) { + if (null == actionName) { + return + } + require(supportedActions.contains(actionName)) { + "Invalid action name. All supported actions: $supportedActions" + } +} + +fun getPriority(taskId: TaskId? = null, actionName: String? = null): Int { + validateTaskIdAndActionName(taskId, actionName) + return when { + null != taskId -> PRIORITY_TASK_ID + else -> PRIORITY_DEFAULT_ACTION + } +} + +fun getDocID(taskId: TaskId? = null, actionName: String? = null): String { + validateTaskIdAndActionName(taskId, actionName) + val id = taskId?.toString() ?: actionName + return LRON_DOC_ID_PREFIX + id +} + +fun getLRONConfigAndParse( + client: NodeClient, + docId: String, + xContentRegistry: NamedXContentRegistry, + actionListener: ActionListener +) { + val getRequest = GetRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId) + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure( + OpenSearchStatusException( + "lronConfig $docId not found", + RestStatus.NOT_FOUND + ) + ) + return + } + + val lronConfig: LRONConfig + try { + lronConfig = + parseFromGetResponse(response, xContentRegistry, LRONConfig.Companion::parse) + } catch (e: IllegalArgumentException) { + actionListener.onFailure(e) + return + } + actionListener.onResponse( + LRONConfigResponse( + id = response.id, + lronConfig = lronConfig + ) + ) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(t) + } + } + ) +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/RestHandlerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/RestHandlerUtils.kt index 928d425eb..934b3d3fb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/RestHandlerUtils.kt @@ -21,6 +21,7 @@ const val IF_SEQ_NO = "if_seq_no" const val _PRIMARY_TERM = "_primary_term" const val IF_PRIMARY_TERM = "if_primary_term" const val REFRESH = "refresh" +const val DRY_RUN = "dry_run" fun RestRequest.getSearchParams(defaultPolicySortField: String): SearchParams { val size = this.paramAsInt("size", DEFAULT_PAGINATION_SIZE) diff --git a/src/main/resources/mappings/opensearch-control-center.json b/src/main/resources/mappings/opensearch-control-center.json new file mode 100644 index 000000000..992f5da72 --- /dev/null +++ b/src/main/resources/mappings/opensearch-control-center.json @@ -0,0 +1,75 @@ +{ + "_meta": { + "schema_version": 1 + }, + "dynamic": "strict", + "properties": { + "lron_config": { + "properties": { + "channels": { + "properties": { + "id": { + "type": "keyword" + } + } + }, + "task_id": { + "type": "keyword" + }, + "action_name": { + "type": "keyword" + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + }, + "lron_condition": { + "properties": { + "success": { + "type": "boolean" + }, + "failure": { + "type": "boolean" + } + } + }, + "priority": { + "type": "integer" + } + } + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/org/opensearch/indexmanagement/AccessRoles.kt b/src/test/kotlin/org/opensearch/indexmanagement/AccessRoles.kt index 921dee7e4..19682e3ac 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/AccessRoles.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/AccessRoles.kt @@ -5,6 +5,9 @@ package org.opensearch.indexmanagement +import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.AddPolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.DeletePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction @@ -57,3 +60,7 @@ const val SEARCH_INDEX = "indices:data/read/search" const val CREATE_INDEX = "indices:admin/create" const val WRITE_INDEX = "indices:data/write/index" const val BULK_WRITE_INDEX = "indices:data/write/bulk*" +// Long-running operation notification (lron) +const val INDEX_LRON_CONFIG = IndexLRONConfigAction.NAME +const val GET_LRON_CONFIG = GetLRONConfigAction.NAME +const val DELETE_LRON_CONFIG = DeleteLRONConfigAction.NAME diff --git a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt index 05612f8d4..4f6b67d74 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt @@ -380,7 +380,7 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { return executeRequest(request, expectedStatus, userClient) } - private fun executeRequest( + protected fun executeRequest( request: Request, expectedRestStatus: RestStatus? = null, client: RestClient diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/LRONConfigSecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/LRONConfigSecurityBehaviorIT.kt new file mode 100644 index 000000000..83ea7a0c2 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/LRONConfigSecurityBehaviorIT.kt @@ -0,0 +1,198 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification + +import org.junit.After +import org.junit.Before +import org.opensearch.client.Request +import org.opensearch.client.RestClient +import org.opensearch.commons.rest.SecureRestClientBuilder +import org.opensearch.indexmanagement.DELETE_LRON_CONFIG +import org.opensearch.indexmanagement.GET_LRON_CONFIG +import org.opensearch.indexmanagement.INDEX_LRON_CONFIG +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.SecurityRestTestCase +import org.opensearch.rest.RestStatus + +@Suppress("UNCHECKED_CAST") +class LRONConfigSecurityBehaviorIT : SecurityRestTestCase() { + private val password = "TestpgfhertergGd435AASA123!" + private val superUser = "superUser" + private var superUserClient: RestClient? = null + + private val testUser = "testUser" + private val testRole = "test" + var testUserClient: RestClient? = null + + @Before + fun setupUsersAndRoles() { + initNodeIdsInRestIT(client()) + // Init super user + val helpdeskClusterPermissions = listOf( + INDEX_LRON_CONFIG, + GET_LRON_CONFIG, + DELETE_LRON_CONFIG + ) + + // In this test suite case john is a "super-user" which has all relevant privileges + createUser(name = superUser, pwd = password) + createAndAssignRole(HELPDESK_ROLE, helpdeskClusterPermissions, superUser) + superUserClient = + SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), superUser, password).setSocketTimeout( + 60000 + ).setConnectionRequestTimeout(180000) + .build() + + createUser(name = testUser, pwd = password) + testUserClient = + SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), testUser, password).setSocketTimeout( + 60000 + ).setConnectionRequestTimeout(180000) + .build() + } + + @After + fun cleanup() { + // Remove super user + superUserClient?.close() + deleteUser(superUser) + deleteRole(HELPDESK_ROLE) + // Remove test user + testUserClient?.close() + deleteUser(testUser) + deleteRole(testRole) + deleteIndexByName(IndexManagementPlugin.CONTROL_CENTER_INDEX) + } + + fun `test index LRONConfig with security using POST`() { + /* super user */ + val request = Request("POST", IndexManagementPlugin.LRON_BASE_URI) + request.setJsonEntity(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())).toJsonString()) + executeRequest(request, RestStatus.OK, superUserClient!!) + /* test user */ + request.setJsonEntity(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())).toJsonString()) + executeRequest(request, RestStatus.FORBIDDEN, testUserClient!!) + + val indexConfigRole = "index_lron_config" + try { + createAndAssignRole(indexConfigRole, listOf(INDEX_LRON_CONFIG), testUser) + executeRequest(request, RestStatus.OK, testUserClient!!) + } finally { + deleteRole(indexConfigRole) + } + } + + fun `test index LRONConfig dry run with security using POST`() { + /* super user */ + val request = Request("POST", IndexManagementPlugin.LRON_BASE_URI) + request.addParameter("dry_run", "true") + request.setJsonEntity(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())).toJsonString()) + executeRequest(request, RestStatus.OK, superUserClient!!) + /* test user */ + executeRequest(request, RestStatus.FORBIDDEN, testUserClient!!) + + val indexConfigRole = "index_lron_config" + try { + createAndAssignRole(indexConfigRole, listOf(INDEX_LRON_CONFIG), testUser) + executeRequest(request, RestStatus.OK, testUserClient!!) + } finally { + deleteRole(indexConfigRole) + } + } + + fun `test update LRONConfig with security using PUT`() { + /* super user */ + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + val createRequest = Request("POST", IndexManagementPlugin.LRON_BASE_URI) + createRequest.setJsonEntity(lronConfig.toJsonString()) + executeRequest(createRequest, RestStatus.OK, superUserClient!!) + val updateRequest = Request("PUT", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + updateRequest.setJsonEntity(randomLRONConfig(taskId = lronConfig.taskId, actionName = lronConfig.actionName).toJsonString()) + executeRequest(updateRequest, RestStatus.OK, superUserClient!!) + + /* test user */ + executeRequest(updateRequest, RestStatus.FORBIDDEN, testUserClient!!) + + val indexConfigRole = "index_lron_config" + try { + createAndAssignRole(indexConfigRole, listOf(INDEX_LRON_CONFIG), testUser) + executeRequest(updateRequest, RestStatus.OK, testUserClient!!) + } finally { + deleteRole(indexConfigRole) + } + } + + fun `test delete LRONConfig with security`() { + /* super user */ + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + val createRequest = Request("POST", IndexManagementPlugin.LRON_BASE_URI) + createRequest.setJsonEntity(lronConfig.toJsonString()) + executeRequest(createRequest, RestStatus.OK, superUserClient!!) + val deleteRequest = Request("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + executeRequest(deleteRequest, RestStatus.OK, superUserClient!!) + + /* test user */ + executeRequest(createRequest, RestStatus.OK, superUserClient!!) + executeRequest(deleteRequest, RestStatus.FORBIDDEN, testUserClient!!) + + val deleteConfigRole = "delete_lron_config" + try { + createAndAssignRole(deleteConfigRole, listOf(DELETE_LRON_CONFIG), testUser) + executeRequest(deleteRequest, RestStatus.OK, testUserClient!!) + } finally { + deleteRole(deleteConfigRole) + } + } + + fun `test get LRONConfig with security`() { + /* super user */ + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + val createRequest = Request("POST", IndexManagementPlugin.LRON_BASE_URI) + createRequest.setJsonEntity(lronConfig.toJsonString()) + executeRequest(createRequest, RestStatus.OK, superUserClient!!) + val getRequest = Request("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + executeRequest(getRequest, RestStatus.OK, superUserClient!!) + + /* test user */ + executeRequest(getRequest, RestStatus.FORBIDDEN, testUserClient!!) + + val getConfigRole = "get_lron_config" + try { + createAndAssignRole(getConfigRole, listOf(GET_LRON_CONFIG), testUser) + executeRequest(getRequest, RestStatus.OK, testUserClient!!) + } finally { + deleteRole(getConfigRole) + } + } + + fun `test get LRONConfigs with security`() { + /* super user */ + val createRequest = Request("POST", IndexManagementPlugin.LRON_BASE_URI) + randomList(1, 15) { + createRequest.setJsonEntity(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())).toJsonString()) + executeRequest(createRequest, RestStatus.OK, superUserClient!!).asMap() + } + + val getRequest = Request("GET", IndexManagementPlugin.LRON_BASE_URI) + executeRequest(getRequest, RestStatus.OK, superUserClient!!) + + /* test user */ + executeRequest(getRequest, RestStatus.FORBIDDEN, testUserClient!!) + + val getConfigRole = "get_lron_config" + try { + createAndAssignRole(getConfigRole, listOf(GET_LRON_CONFIG), testUser) + executeRequest(getRequest, RestStatus.OK, testUserClient!!) + } finally { + deleteRole(getConfigRole) + } + } + + private fun createAndAssignRole(roleName: String, permissions: List, user: String) { + createRole(roleName, permissions, emptyList(), emptyList()) + assignRoleToUsers(roleName, listOf(user)) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/SerializationTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/SerializationTests.kt new file mode 100644 index 000000000..d8c2c6979 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/SerializationTests.kt @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification + +import org.junit.Assert +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigRequest +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigRequest +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigResponse +import org.opensearch.indexmanagement.opensearchapi.convertToMap +import org.opensearch.indexmanagement.snapshotmanagement.getRandomString +import org.opensearch.test.OpenSearchTestCase + +class SerializationTests : OpenSearchTestCase() { + + fun `test lronConfig serialization`() { + val lronConfig = randomLRONConfig() + val out = BytesStreamOutput() + lronConfig.writeTo(out) + + Assert.assertEquals( + buildMessage("lronConfig"), + lronConfig, + LRONConfig(out.bytes().streamInput()) + ) + } + + fun `test deleteLRONConfigRequest`() { + val deleteLRONConfigRequest = DeleteLRONConfigRequest(getRandomString(20)) + val out = BytesStreamOutput() + deleteLRONConfigRequest.writeTo(out) + Assert.assertEquals( + buildMessage("deleteLronConfigRequest"), + deleteLRONConfigRequest.docId, + DeleteLRONConfigRequest(out.bytes().streamInput()).docId + ) + } + + fun `test getLRONConfigRequest`() { + val getLRONConfigRequest = GetLRONConfigRequest(getRandomString(20)) + val out = BytesStreamOutput() + getLRONConfigRequest.writeTo(out) + Assert.assertEquals( + buildMessage("getLronConfigRequest"), + getLRONConfigRequest.docId, + GetLRONConfigRequest(out.bytes().streamInput()).docId + ) + } + + fun `test lronConfigResponse`() { + val lronConfigResponse = randomLRONConfigResponse() + val out = BytesStreamOutput() + lronConfigResponse.writeTo(out) + Assert.assertEquals( + buildMessage("lronConfigResponse"), + lronConfigResponse.convertToMap(), + LRONConfigResponse(out.bytes().streamInput()).convertToMap() + ) + } + + fun `test getLRONConfigResponse`() { + val getLRONConfigResponse = randomGetLRONConfigResponse(10) + val out = BytesStreamOutput() + getLRONConfigResponse.writeTo(out) + Assert.assertEquals( + buildMessage("getLRONConfigResponse"), + getLRONConfigResponse.convertToMap(), + GetLRONConfigResponse(out.bytes().streamInput()).convertToMap() + ) + } + + private fun buildMessage( + itemType: String + ): String { + return "$itemType serialization test failed. " + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/TestHelpers.kt new file mode 100644 index 000000000..bf4b7413a --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/TestHelpers.kt @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification + +import org.opensearch.client.RestClient +import org.opensearch.common.UUIDs +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.authuser.User +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONCondition +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID +import org.opensearch.indexmanagement.controlcenter.notification.util.getPriority +import org.opensearch.indexmanagement.controlcenter.notification.util.supportedActions +import org.opensearch.indexmanagement.common.model.notification.Channel +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigResponse +import org.opensearch.indexmanagement.indexstatemanagement.randomChannel +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.opensearchapi.string +import org.opensearch.indexmanagement.randomUser +import org.opensearch.tasks.TaskId +import org.opensearch.test.OpenSearchTestCase.randomBoolean +import org.opensearch.test.OpenSearchTestCase.randomLong +import org.opensearch.test.rest.OpenSearchRestTestCase + +/* need to be initialized before used */ +var nodeIdsInRestIT: Set = emptySet() +@Suppress("UNCHECKED_CAST") +fun initNodeIdsInRestIT(client: RestClient) { + if (nodeIdsInRestIT.isNotEmpty()) { + return + } + val responseMap = + OpenSearchRestTestCase.entityAsMap(client.makeRequest("GET", "_nodes")) + val nodesMap = responseMap["nodes"] as Map + nodeIdsInRestIT = nodesMap.keys +} + +fun randomLRONConfig( + lronCondition: LRONCondition = randomLRONCondition(), + taskId: TaskId? = randomTaskId(), + actionName: String? = randomActionName(), + channels: List? = List(OpenSearchRestTestCase.randomIntBetween(1, 10)) { randomChannel() }, + user: User? = randomUser() +): LRONConfig { + val priority = getPriority(taskId, actionName) + return LRONConfig( + lronCondition = lronCondition, + taskId = taskId, + actionName = actionName, + channels = channels, + user = user, + priority = priority + ) +} + +fun randomLRONCondition( + success: Boolean = randomBoolean(), + failure: Boolean = randomBoolean() +): LRONCondition { + return LRONCondition(success, failure) +} + +fun randomTaskId( + nodeId: String = UUIDs.randomBase64UUID(), + id: Long = randomLong() +): TaskId { + return TaskId(nodeId, id) +} + +fun randomActionName(): String { + return supportedActions.random() +} + +fun randomLRONConfigResponse( + lronConfig: LRONConfig = randomLRONConfig() +): LRONConfigResponse { + val id = getDocID(lronConfig.taskId, lronConfig.actionName) + return LRONConfigResponse( + id = id, + lronConfig = lronConfig + ) +} + +fun randomGetLRONConfigResponse( + size: Int = 10 +): GetLRONConfigResponse { + return GetLRONConfigResponse( + lronConfigResponses = List(size) { randomLRONConfigResponse() }, + size + ) +} + +fun LRONConfig.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent( + XContentFactory.jsonBuilder(), params +).string() + +fun getResourceURI(taskId: TaskId?, actionName: String?): String { + return "${IndexManagementPlugin.LRON_BASE_URI}/${getDocID(taskId, actionName)}" +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/XContentTests.kt new file mode 100644 index 000000000..526d690a3 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/XContentTests.kt @@ -0,0 +1,209 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification + +import org.junit.Assert +import org.junit.BeforeClass +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONCondition +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.util.PRIORITY_TASK_ID +import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID +import org.opensearch.indexmanagement.common.model.notification.Channel +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigResponse +import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.opensearchapi.string +import org.opensearch.indexmanagement.randomUser +import org.opensearch.tasks.TaskId +import org.opensearch.test.OpenSearchTestCase + +class XContentTests : OpenSearchTestCase() { + fun `test lronConfig parsing`() { + Assert.assertEquals( + buildMessage("lronConfig", XContentType.JSON), + sampleLRONConfig, + parsedItem(sampleLRONConfig, XContentType.JSON, LRONConfig.Companion::parse) + ) + + val xContentType = XContentType.values().random() + val lronConfig = randomLRONConfig() + Assert.assertEquals( + buildMessage("lronConfig", xContentType), + lronConfig, + parsedItem(lronConfig, xContentType, LRONConfig.Companion::parse) + ) + } + + fun `test lronConfig Parsing default values`() { + val jsonString = """ + { + "lron_config": { + "task_id": "node_123:456", + "channels": [ + { + "id": "channel123" + } + ] + } + } + """.replace("\\s".toRegex(), "") + val lronConfig = XContentType.JSON.xContent().createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + jsonString + ).parseWithType(parse = LRONConfig.Companion::parse) + assertEquals("action name should be null", null, lronConfig.actionName) + assertEquals("should be true by default", true, lronConfig.lronCondition.success) + assertEquals("should be true by default", true, lronConfig.lronCondition.failure) + } + + fun `test lronConfig Parsing with no id no action fails`() { + val jsonString = """ + { + "lron_config": { + "task_id": "node_123:456" + } + } + """.replace("\\s".toRegex(), "") + try { + XContentType.JSON.xContent().createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + jsonString + ).parseWithType(parse = LRONConfig.Companion::parse) + Assert.fail("expect to throw error when parsing lronConfig") + } catch (e: IllegalArgumentException) { + assertEquals(e.message, "Enabled LRONConfig must contain at least one channel") + } + } + + fun `test lronConfig Parsing with no channels fails`() { + val jsonString = """ + { + "lron_config": { + "channels": [ + { + "id": "channel123" + } + ] + } + } + """.replace("\\s".toRegex(), "") + try { + XContentType.JSON.xContent().createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + jsonString + ).parseWithType(parse = LRONConfig.Companion::parse) + Assert.fail("expect to throw error when parsing lronConfig") + } catch (e: IllegalArgumentException) { + assertEquals(e.message, "LRONConfig must contain taskID or actionName") + } + } + + fun `test lronConfigResponse`() { + val responseString = sampleLRONConfigResponse + .toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string() + /* we drop the user info and priority info in rest layer */ + assertEquals("lronConfigResponse toXcontent failed.", sampleExpectedJson, responseString) + } + + fun `test getLRONConfigResponse`() { + val response = GetLRONConfigResponse( + listOf(sampleLRONConfigResponse, sampleLRONConfigResponse), + totalNumber = 2 + ) + val responseString = response.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string() + val expectedJSON = """ + { + "lron_configs": [ + $sampleExpectedJson, + $sampleExpectedJson + ], + "total_number": 2 + } + """.replace("\\s".toRegex(), "") + + assertEquals("getLRONConfigResponse toXcontent failed.", expectedJSON, responseString) + } + + private fun buildMessage( + itemType: String, + xContentType: XContentType + ): String { + return "$itemType toXContent test failed. xContentType: ${xContentType.subtype()}. " + } + + private fun parsedItem( + item: T, + xContentType: XContentType, + parseWithTypeParser: (xcp: XContentParser, id: String, seqNo: Long, primaryTerm: Long) -> T + ): T { + val bytesReference = toShuffledXContent( + item, + xContentType.xContent().mediaType(), + ToXContent.EMPTY_PARAMS, + randomBoolean() + ) + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + bytesReference, + xContentType.xContent().mediaType() + ) + return xcp.parseWithType(parse = parseWithTypeParser) + } + + companion object { + lateinit var sampleLRONConfig: LRONConfig + lateinit var sampleLRONConfigResponse: LRONConfigResponse + lateinit var sampleExpectedJson: String + + @BeforeClass + @JvmStatic + fun setup() { + sampleLRONConfig = LRONConfig( + lronCondition = LRONCondition(success = true, failure = false), + taskId = TaskId("node_123", 456L), + actionName = "indices:admin/resize", + channels = listOf(Channel("channel123"), Channel("channel456")), + user = randomUser(), + priority = PRIORITY_TASK_ID + ) + sampleLRONConfigResponse = LRONConfigResponse( + id = getDocID(sampleLRONConfig.taskId, sampleLRONConfig.actionName), + lronConfig = sampleLRONConfig + ) + sampleExpectedJson = """ + { + "_id": "LRON:node_123:456", + "lron_config": { + "lron_condition": { + "success": true, + "failure": false + }, + "task_id": "node_123:456", + "action_name": "indices:admin/resize", + "channels": [ + { + "id": "channel123" + }, + { + "id": "channel456" + } + ] + } + } + """.replace("\\s".toRegex(), "") + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/ActionTests.kt new file mode 100644 index 000000000..454459481 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/ActionTests.kt @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.action + +import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetLRONConfigAction +import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigAction +import org.opensearch.test.OpenSearchTestCase + +class ActionTests : OpenSearchTestCase() { + fun `test index lronConfig action name`() { + assertNotNull(IndexLRONConfigAction.INSTANCE.name()) + assertEquals(IndexLRONConfigAction.INSTANCE.name(), IndexLRONConfigAction.NAME) + } + + fun `test delete lronConfig action name`() { + assertNotNull(DeleteLRONConfigAction.INSTANCE.name()) + assertEquals(DeleteLRONConfigAction.INSTANCE.name(), DeleteLRONConfigAction.NAME) + } + + fun `test get lronConfig action name`() { + assertNotNull(GetLRONConfigAction.INSTANCE.name()) + assertEquals(GetLRONConfigAction.INSTANCE.name(), GetLRONConfigAction.NAME) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt new file mode 100644 index 000000000..d3f6258b4 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.resthandler + +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.HttpEntity +import org.apache.hc.core5.http.io.entity.StringEntity +import org.junit.After +import org.junit.AfterClass +import org.junit.Before +import org.opensearch.client.Response +import org.opensearch.client.ResponseException +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.IndexManagementRestTestCase +import org.opensearch.indexmanagement.controlcenter.notification.initNodeIdsInRestIT +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT +import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId +import org.opensearch.indexmanagement.controlcenter.notification.toJsonString +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.rest.RestStatus + +abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() { + @Before + fun prepareForIT() { + setDebugLogLevel() + /* init cluster node ids in integ test */ + initNodeIdsInRestIT(client()) + /* index a random doc to create .opensearch-control-center index */ + createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) + } + + @After + fun removeAllDocs() { + try { + client().makeRequest( + "POST", + "${IndexManagementPlugin.CONTROL_CENTER_INDEX}/_delete_by_query", + mapOf("refresh" to "true"), + StringEntity("""{"query": {"match_all": {}}}""", ContentType.APPLICATION_JSON) + ) + } catch (e: ResponseException) { + logger.info(e.response.asMap()) + /* ignore if the index has not been created */ + assertEquals("Unexpected status", RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + fun createLRONConfig(lronConfig: LRONConfig): Response { + return client().makeRequest("POST", IndexManagementPlugin.LRON_BASE_URI, emptyMap(), lronConfig.toHttpEntity()) + } + + private fun setDebugLogLevel() { + client().makeRequest( + "PUT", "_cluster/settings", + StringEntity( + """ + { + "transient": { + "logger.org.opensearch.indexmanagement.controlcenter.notification":"DEBUG" + } + } + """.trimIndent(), + ContentType.APPLICATION_JSON + ) + ) + } + + protected fun LRONConfig.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON) + + companion object { + @AfterClass + @JvmStatic fun clearIndicesAfterClass() { + wipeAllIndices() + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt new file mode 100644 index 000000000..35ac2ccfc --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.resthandler + +import org.junit.Assert +import org.opensearch.client.ResponseException +import org.opensearch.indexmanagement.controlcenter.notification.getResourceURI +import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT +import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId +import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.rest.RestStatus + +@Suppress("UNCHECKED_CAST") +class RestDeleteLRONConfigActionIT : LRONConfigRestTestCase() { + fun `test delete LRONConfig`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + createLRONConfig(lronConfig) + + val response = client().makeRequest("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + assertEquals("delete LRONConfig failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val deletedId = responseBody["_id"] as String + val deletedResult = responseBody["result"] as String + Assert.assertEquals("not same doc id", getDocID(lronConfig.taskId, lronConfig.actionName), deletedId) + Assert.assertEquals("wrong delete result", "deleted", deletedResult) + + try { + client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + fail("Expected 404 NOT_FOUND") + } catch (e: ResponseException) { + logger.info(e.response.asMap()) + assertEquals("Unexpected status", RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + fun `test delete nonexist LRONConfig response`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + val response = client().makeRequest("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + assertEquals("delete LRONConfig failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val deletedId = responseBody["_id"] as String + val deletedResult = responseBody["result"] as String + Assert.assertEquals("not same doc id", getDocID(lronConfig.taskId, lronConfig.actionName), deletedId) + Assert.assertEquals("wrong delete result", "not_found", deletedResult) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt new file mode 100644 index 000000000..92f50cb62 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.resthandler + +import org.junit.Assert +import org.opensearch.client.ResponseException +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.getResourceURI +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT +import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId +import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.opensearchapi.convertToMap +import org.opensearch.rest.RestStatus +import org.opensearch.test.OpenSearchTestCase + +@Suppress("UNCHECKED_CAST") +class RestGetLRONConfigActionIT : LRONConfigRestTestCase() { + fun `test get LRONConfig with id`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + createLRONConfig(lronConfig) + val response = client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + assertEquals("get LRONConfig failed", RestStatus.OK, response.restStatus()) + val lronConfigs = response.asMap()["lron_configs"] as List> + val responseBody = lronConfigs[0] + val gotId = responseBody["_id"] as String + Assert.assertEquals("not same doc id", getDocID(lronConfig.taskId, lronConfig.actionName), gotId) + val lronConfigMap = lronConfig.convertToMap()[LRONConfig.LRON_CONFIG_FIELD] as Map + Assert.assertEquals( + "not same LRONConfig", + lronConfigMap.filterKeys { it != LRONConfig.USER_FIELD && it != LRONConfig.PRIORITY_FIELD }, + responseBody["lron_config"] as Map + ) + } + + fun `test get nonexist LRONConfig fails`() { + try { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + fail("Expected 404 NOT_FOUND") + } catch (e: ResponseException) { + logger.debug(e.response.asMap()) + assertEquals("Unexpected status", RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + fun `test get all LRONConfigs`() { + /* LRONConfigRestTestCase index a doc to auto create the index, here we wipe the index before count doc number */ + wipeAllIndices() + val lronConfigResponses = randomList(1, 15) { + createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))).asMap() + } + val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) + assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val totalNumber = responseBody["total_number"] + val resLRONConfigResponses = responseBody["lron_configs"] as List> + + assertTrue("LRONConfigs total numbers was not the same", resLRONConfigResponses.size == totalNumber) + assertTrue("LRONConfigs response has different size", lronConfigResponses.size == resLRONConfigResponses.size) + + for (lronConfigResponse in lronConfigResponses) { + val resLRONConfigResponse = resLRONConfigResponses.find { lronConfigResponse["_id"] as String == it["_id"] as String } + assertEquals( + "different lronConfigResponse", + lronConfigResponse[LRONConfig.LRON_CONFIG_FIELD], + resLRONConfigResponse!![LRONConfig.LRON_CONFIG_FIELD] + ) + } + } + + fun `test get LRONConfig with docId and searchParams`() { + try { + val lronConfig = randomLRONConfig() + client().makeRequest( + "GET", + getResourceURI(lronConfig.taskId, lronConfig.actionName), + mapOf("size" to "10") + ) + Assert.fail("Expected 400 BAD_REQUEST") + } catch (e: ResponseException) { + logger.debug(e.response.asMap()) + assertEquals("unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + + fun `test get all LRONConfig if index not exists`() { + try { + wipeAllIndices() + val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) + assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val totalNumber = responseBody["total_number"] + OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber) + } finally { + /* index a random doc to create .opensearch-control-center index */ + createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt new file mode 100644 index 000000000..a83ff482f --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt @@ -0,0 +1,188 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.resthandler + +import org.junit.Assert +import org.opensearch.client.ResponseException +import org.opensearch.common.xcontent.XContentType +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.controlcenter.notification.getResourceURI +import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT +import org.opensearch.indexmanagement.controlcenter.notification.randomLRONCondition +import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig +import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId +import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID +import org.opensearch.indexmanagement.indexstatemanagement.randomChannel +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.opensearchapi.convertToMap +import org.opensearch.indexmanagement.util.DRY_RUN +import org.opensearch.rest.RestStatus + +@Suppress("UNCHECKED_CAST") +class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() { + fun `test creating LRONConfig using POST`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + val response = createLRONConfig(lronConfig) + assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val createdId = responseBody["_id"] as String + Assert.assertEquals("not same doc id", getDocID(lronConfig.taskId, lronConfig.actionName), createdId) + val lronConfigMap = lronConfig.convertToMap()[LRONConfig.LRON_CONFIG_FIELD] as Map + Assert.assertEquals( + "not same LRONConfig", + lronConfigMap.filterKeys { it != LRONConfig.USER_FIELD && it != LRONConfig.PRIORITY_FIELD }, + responseBody["lron_config"] as Map + ) + } + + fun `test creating LRONConfig with id fails using POST`() { + try { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + client().makeRequest( + "POST", + getResourceURI(lronConfig.taskId, lronConfig.actionName), + emptyMap(), + lronConfig.toHttpEntity() + ) + fail("Expected 405 METHOD_NOT_ALLOWED") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.METHOD_NOT_ALLOWED, e.response.restStatus()) + } + } + + fun `test creating LRONConfig twice fails using POST`() { + try { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + createLRONConfig(lronConfig) + createLRONConfig(lronConfig) + fail("Expected 409 CONFLICT") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.CONFLICT, e.response.restStatus()) + } + } + + fun `test update LRONConfig using PUT`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + createLRONConfig(lronConfig) + + val newLRONConfig = LRONConfig( + lronCondition = randomLRONCondition(), + taskId = lronConfig.taskId, + actionName = lronConfig.actionName, + channels = List(10) { randomChannel() }, + user = null, + priority = null + ) + + val response = client().makeRequest( + "PUT", + getResourceURI(lronConfig.taskId, lronConfig.actionName), + emptyMap(), + newLRONConfig.toHttpEntity() + ) + + assertEquals("update LRONConfig failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val updatedId = responseBody["_id"] as String + Assert.assertEquals("not same doc id", getDocID(lronConfig.taskId, lronConfig.actionName), updatedId) + val newLRONConfigMap = newLRONConfig.convertToMap()[LRONConfig.LRON_CONFIG_FIELD] as Map + Assert.assertEquals( + "not same LRONConfig", + newLRONConfigMap.filterKeys { it != LRONConfig.USER_FIELD && it != LRONConfig.PRIORITY_FIELD }, + responseBody["lron_config"] as Map + ) + } + + fun `test create LRONConfig using PUT`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + val response = client().makeRequest( + "PUT", + getResourceURI(lronConfig.taskId, lronConfig.actionName), + emptyMap(), + lronConfig.toHttpEntity() + ) + assertEquals("autocreate LRONConfig failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val lronConfigMap = lronConfig.convertToMap()[LRONConfig.LRON_CONFIG_FIELD] as Map + Assert.assertEquals( + "not same LRONConfig", + lronConfigMap.filterKeys { it != LRONConfig.USER_FIELD && it != LRONConfig.PRIORITY_FIELD }, + responseBody["lron_config"] as Map + ) + } + + fun `test creating LRONConfig without id fails using PUT`() { + try { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + client().makeRequest( + "PUT", + IndexManagementPlugin.LRON_BASE_URI, + emptyMap(), + lronConfig.toHttpEntity() + ) + fail("Expected 405 METHOD_NOT_ALLOWED") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.METHOD_NOT_ALLOWED, e.response.restStatus()) + } + } + + fun `test creating LRONConfig dryRun`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + /* first use POST and PUT to create, then try to get */ + client().makeRequest( + "POST", + IndexManagementPlugin.LRON_BASE_URI, + mapOf(DRY_RUN to "true"), + lronConfig.toHttpEntity() + ) + client().makeRequest( + "PUT", + getResourceURI(lronConfig.taskId, lronConfig.actionName), + mapOf(DRY_RUN to "true"), + lronConfig.toHttpEntity() + ) + try { + client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName)) + fail("Expected 404 NOT_FOUND") + } catch (e: ResponseException) { + logger.debug(e.response.asMap()) + assertEquals("Unexpected status", RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + fun `test autocreate index for indexLRONConfig action`() { + wipeAllIndices() + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + var response = createLRONConfig(lronConfig) + assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus()) + wipeAllIndices() + response = client().makeRequest( + "PUT", + getResourceURI(lronConfig.taskId, lronConfig.actionName), + lronConfig.toHttpEntity() + ) + assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus()) + wipeAllIndices() + } + + fun `test mappings after LRONConfig creation`() { + val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) + createLRONConfig(lronConfig) + + val response = client().makeRequest("GET", "/${IndexManagementPlugin.CONTROL_CENTER_INDEX}/_mapping") + val parserMap = createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map> + val mappingsMap = parserMap[IndexManagementPlugin.CONTROL_CENTER_INDEX]!!["mappings"] as Map + val expected = createParser( + XContentType.JSON.xContent(), + javaClass.classLoader.getResource("mappings/opensearch-control-center.json")!! + .readText() + ) + val expectedMap = expected.map() + + assertEquals("Mappings are different", expectedMap, mappingsMap) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtilsTests.kt new file mode 100644 index 000000000..6be83959a --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtilsTests.kt @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.controlcenter.notification.util + +import org.junit.Assert +import org.opensearch.index.reindex.ReindexAction +import org.opensearch.indexmanagement.controlcenter.notification.randomActionName +import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId +import org.opensearch.tasks.TaskId +import org.opensearch.test.OpenSearchTestCase +import kotlin.IllegalArgumentException + +class LRONUtilsTests : OpenSearchTestCase() { + fun `test validateActionName`() { + validateActionName("indices:data/write/reindex") + Assert.assertThrows(IllegalArgumentException::class.java) { + validateActionName("indices:data/read/search") + } + } + + fun `test validateTaskIdAndActionName`() { + validateTaskIdAndActionName(randomTaskId(), randomActionName()) + validateTaskIdAndActionName(null, randomActionName()) + validateTaskIdAndActionName(randomTaskId(), null) + Assert.assertThrows(IllegalArgumentException::class.java) { + validateTaskIdAndActionName(null, null) + } + Assert.assertThrows(IllegalArgumentException::class.java) { + validateTaskIdAndActionName(randomTaskId(), "indices:data/read/search") + } + } + + fun `test getPriority`() { + Assert.assertEquals(PRIORITY_TASK_ID, getPriority(randomTaskId(), randomActionName())) + Assert.assertEquals(PRIORITY_DEFAULT_ACTION, getPriority(null, randomActionName())) + } + + fun `test getDocID`() { + Assert.assertEquals("LRON:test:123", getDocID(TaskId("test", 123), randomActionName())) + Assert.assertEquals("LRON:indices:data/write/reindex", getDocID(null, ReindexAction.NAME)) + } +}