diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 6cce97a21..b5870f21b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -217,6 +217,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerClusterService(clusterService) .registerClient(client) .registerNamedXContentRegistry(xContentRegistry) + .registerindexNameExpressionResolver(indexNameExpressionResolver) .registerScriptService(scriptService) .registerSettings(settings) .registerThreadPool(threadPool) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b8ad4c685..b8c8183ff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,8 +8,6 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException -import org.opensearch.action.admin.indices.get.GetIndexRequest -import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction @@ -24,9 +22,11 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.bytes.BytesReference @@ -54,6 +54,7 @@ import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID +import kotlin.collections.HashMap import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { @@ -88,13 +89,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata( - monitor = monitor, + monitor, createWithRunContext = false, skipIndex = isTempMonitor ) val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput - val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() @@ -107,6 +108,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val docsToQueries = mutableMapOf>() try { + // Resolve all passed indices to concrete indices + val indices = IndexUtils.resolveAllIndices( + docLevelMonitorInput.indices, + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( monitor = monitor, @@ -115,12 +123,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexTimeout = monitorCtx.indexTimeout!! ) - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it) - } - val indices = getIndexResponse.indices() - // cleanup old indices that are not monitored anymore from the same monitor for (ind in updatedLastRunContext.keys) { if (!indices.contains(ind)) { @@ -131,8 +133,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indices.forEach { indexName -> // Prepare lastRunContext for each index val indexLastRunContext = lastRunContext.getOrPut(indexName) { - val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse) - MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently) + val isIndexCreatedRecently = createdRecently( + monitor, + periodStart, + periodEnd, + monitorCtx.clusterService!!.state().metadata.index(indexName) + ) + MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently) } // Prepare updatedLastRunContext for each index @@ -385,9 +392,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { throw IOException("Invalid input with document-level-monitor.") } - val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput - if (docLevelMonitorInput.indices.size > 1) { - throw IOException("Only one index is supported with document-level-monitor.") + if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) { + throw IllegalArgumentException("DocLevelMonitorInput has no indices") } } @@ -414,13 +420,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // new index is monitored from the beginning of that index private fun createdRecently( monitor: Monitor, - index: String, periodStart: Instant, periodEnd: Instant, - getIndexResponse: GetIndexResponse + indexMetadata: IndexMetadata ): Boolean { val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart - return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli() + val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L + return indexCreationDate > lastExecutionTime.toEpochMilli() } /** diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 170bddb62..9d849c70b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -62,16 +62,24 @@ abstract class MonitorRunner { throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") } if (!dryrun) { - val roles = MonitorRunnerService.getRolesForMonitor(monitor) - withClosableContext( - InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles) - ) { - actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( - action, - monitorCtx, - actionOutput[Action.SUBJECT], - actionOutput[Action.MESSAGE]!! - ) + val client = monitorCtx.client + client!!.threadPool().threadContext.stashContext().use { + withClosableContext( + InjectorContextElement( + monitor.id, + monitorCtx.settings!!, + monitorCtx.threadPool!!.threadContext, + monitor.user?.roles, + monitor.user + ) + ) { + actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( + action, + monitorCtx, + actionOutput[Action.SUBJECT], + actionOutput[Action.MESSAGE]!! + ) + } } } ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 55624d66e..25bb42fc9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -25,6 +26,7 @@ data class MonitorRunnerExecutionContext( var clusterService: ClusterService? = null, var client: Client? = null, var xContentRegistry: NamedXContentRegistry? = null, + var indexNameExpressionResolver: IndexNameExpressionResolver? = null, var scriptService: ScriptService? = null, var settings: Settings? = null, var threadPool: ThreadPool? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index a3463d03f..bdcf8263b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -35,6 +35,7 @@ import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.isDocLevelMonitor import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.component.AbstractLifecycleComponent import org.opensearch.common.settings.Settings @@ -75,6 +76,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerindexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): MonitorRunnerService { + this.monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver + return this + } + fun registerScriptService(scriptService: ScriptService): MonitorRunnerService { this.monitorCtx.scriptService = scriptService return this diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index 45d2cd9b2..176b6d1d4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.alerts import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.ActionListener import org.opensearch.action.admin.cluster.state.ClusterStateRequest @@ -36,6 +37,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT +import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.client.Client import org.opensearch.cluster.ClusterChangedEvent @@ -357,8 +359,12 @@ class AlertIndices( return try { val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) } createIndexResponse.isAcknowledged - } catch (e: ResourceAlreadyExistsException) { - true + } catch (t: Exception) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { + true + } else { + throw AlertingException.wrap(t) + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 13f6e8147..cd95ab35b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -5,9 +5,8 @@ package org.opensearch.alerting.transport -import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException @@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.opensearchapi.suspendUntil @@ -57,6 +57,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java) class TransportDeleteMonitorAction @Inject constructor( @@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor( if (!validateUserBackendRoles(user, actionListener)) { return } - - GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) { + scope.launch { DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart() } } @@ -109,9 +109,7 @@ class TransportDeleteMonitorAction @Inject constructor( checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) if (canDelete) { - val deleteResponse = deleteMonitor(monitor) - deleteDocLevelMonitorQueriesAndIndices(monitor) - deleteMetadata(monitor) + val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId) actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)) } else { actionListener.onFailure( @@ -119,6 +117,7 @@ class TransportDeleteMonitorAction @Inject constructor( ) } } catch (t: Exception) { + log.error("Failed to delete monitor ${deleteRequest.id()}", t) actionListener.onFailure(AlertingException.wrap(t)) } } @@ -140,68 +139,102 @@ class TransportDeleteMonitorAction @Inject constructor( ) return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor } + } + + companion object { + @JvmStatic + suspend fun deleteAllResourcesForMonitor( + client: Client, + monitor: Monitor, + deleteRequest: DeleteRequest, + monitorId: String, + ): DeleteResponse { + val deleteResponse = deleteMonitorDocument(client, deleteRequest) + deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId) + deleteMetadata(client, monitor) + return deleteResponse + } - private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse { + private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse { return client.suspendUntil { delete(deleteRequest, it) } } - private suspend fun deleteMetadata(monitor: Monitor) { + suspend fun deleteMetadata(client: Client, monitor: Monitor) { val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata") - val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + try { + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}") + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted, + // we cannot retry based on this failure + log.error("Failed to delete monitor metadata ${deleteRequest.id()}.", e) + } } - private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { - val clusterState = clusterService.state() - val metadata = MonitorMetadataService.getMetadata(monitor) - metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> - - val indicesExistsResponse: IndicesExistsResponse = - client.suspendUntil { - client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + suspend fun deleteDocLevelMonitorQueriesAndIndices( + client: Client, + monitor: Monitor, + monitorId: String, + ) { + try { + val metadata = MonitorMetadataService.getMetadata(monitor) + metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> + + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return } - if (indicesExistsResponse.isExists == false) { - return - } - // Check if there's any queries from other monitors in this queryIndex, - // to avoid unnecessary doc deletion, if we could just delete index completely - val searchResponse: SearchResponse = client.suspendUntil { - search( - SearchRequest(queryIndex).source( - SearchSourceBuilder() - .size(0) - .query( - QueryBuilders.boolQuery().mustNot( - QueryBuilders.matchQuery("monitor_id", monitorId) + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely + val searchResponse: SearchResponse = client.suspendUntil { + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitorId) + ) ) - ) - ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), - it - ) - } - if (searchResponse.hits.totalHits.value == 0L) { - val ack: AcknowledgedResponse = client.suspendUntil { - client.admin().indices().delete( - DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it ) } - if (ack.isAcknowledged == false) { - log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") - } - } else { - // Delete all queries added by this monitor - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) = cont.resumeWithException(t) - } + if (searchResponse.hits.totalHits.value == 0L) { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) { + cont.resumeWithException(t) + } + } + ) + } } } + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted successfully, + // we cannot retry based on this failure + log.error("Failed to delete doc level queries from query index.", e) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 5d20b51e4..310def8fe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchException import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException @@ -19,6 +20,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest @@ -175,7 +177,7 @@ class TransportIndexMonitorAction @Inject constructor( client: Client, actionListener: ActionListener, request: IndexMonitorRequest, - user: User? + user: User?, ) { val indices = mutableListOf() // todo: for doc level alerting: check if index is present before monitor is created. @@ -228,7 +230,7 @@ class TransportIndexMonitorAction @Inject constructor( client: Client, actionListener: ActionListener, request: IndexMonitorRequest, - user: User? + user: User?, ) { client.threadPool().threadContext.stashContext().use { IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD() @@ -239,7 +241,7 @@ class TransportIndexMonitorAction @Inject constructor( private val client: Client, private val actionListener: ActionListener, private val request: IndexMonitorRequest, - private val user: User? + private val user: User?, ) { fun resolveUserAndStart() { @@ -303,7 +305,7 @@ class TransportIndexMonitorAction @Inject constructor( } override fun onFailure(t: Exception) { // https://github.com/opensearch-project/alerting/issues/646 - if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { scope.launch { // Wait for the yellow status val request = ClusterHealthRequest() @@ -490,16 +492,30 @@ class TransportIndexMonitorAction @Inject constructor( ) return } - request.monitor = request.monitor.copy(id = indexResponse.id) - var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) - if (created == false) { - log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!") + var metadata: MonitorMetadata? + try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener + request.monitor = request.monitor.copy(id = indexResponse.id) + var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor) + if (created == false) { + log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!") + } + metadata = monitorMetadata + } catch (t: Exception) { + log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor") + cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) + throw t } - if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) + try { + if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) + } + // When inserting queries in queryIndex we could update sourceToQueryIndexMapping + MonitorMetadataService.upsertMetadata(metadata, updating = true) + } catch (t: Exception) { + log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t) + cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) + throw t } - // When inserting queries in queryIndex we could update sourceToQueryIndexMapping - MonitorMetadataService.upsertMetadata(metadata, updating = true) actionListener.onResponse( IndexMonitorResponse( @@ -512,6 +528,24 @@ class TransportIndexMonitorAction @Inject constructor( } } + private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) { + // we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request + try { + TransportDeleteMonitorAction.deleteAllResourcesForMonitor( + client, + monitor = monitor, + DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE), + indexMonitorResponse.id + ) + log.debug( + "Cleaned up monitor related resources after monitor creation request partial failure. " + + "Monitor id : ${indexMonitorResponse.id}" + ) + } catch (e: Exception) { + log.error("Failed to clean up monitor after monitor creation request partial failure", e) + } + } + @Suppress("UNCHECKED_CAST") private suspend fun indexDocLevelMonitorQueries( monitor: Monitor, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index b13ea935a..b45011aa3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -13,8 +13,6 @@ import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.delete.DeleteIndexRequest -import org.opensearch.action.admin.indices.get.GetIndexRequest -import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse @@ -26,6 +24,7 @@ import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.MonitorRunnerService.monitorCtx import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client @@ -86,8 +85,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return try { val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) } createIndexResponse.isAcknowledged - } catch (t: ResourceAlreadyExistsException) { - if (t.message?.contains("already exists") == true) { + } catch (t: Exception) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { true } else { throw t @@ -107,9 +106,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ admin().indices().delete(DeleteIndexRequest(dataSources.queryIndex), it) } if (!acknowledgedResponse.isAcknowledged) { - val errorMessage = "Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!" - log.error(errorMessage) - throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR)) + log.warn("Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!") } } val alias = dataSources.queryIndex @@ -125,8 +122,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return try { val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) } createIndexResponse.isAcknowledged - } catch (t: ResourceAlreadyExistsException) { - if (t.message?.contains("already exists") == true) { + } catch (t: Exception) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { true } else { throw t @@ -202,16 +199,15 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ indexTimeout: TimeValue ) { val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput - val index = docLevelMonitorInput.indices[0] val queries: List = docLevelMonitorInput.queries - val clusterState = clusterService.state() + val indices = IndexUtils.resolveAllIndices( + docLevelMonitorInput.indices, + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = client.suspendUntil { - client.admin().indices().getIndex(getIndexRequest, it) - } - val indices = getIndexResponse.indices() + val clusterState = clusterService.state() // Run through each backing index and apply appropriate mappings to query index indices?.forEach { indexName -> @@ -383,7 +379,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ /** * Adjusts max field limit index setting for query index if source index has higher limit. - * This will prevent max field limit exception, when applying mappings to query index + * This will prevent max field limit exception, when source index has more fields then query index limit */ private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) { val getSettingsResponse: GetSettingsResponse = client.suspendUntil { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index b24962aa5..a2770bc7a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -7,17 +7,21 @@ package org.opensearch.alerting.util import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.metadata.IndexNameExpressionResolver +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.index.IndexNotFoundException class IndexUtils { @@ -130,5 +134,26 @@ class IndexUtils { } } } + + @JvmStatic + fun resolveAllIndices(indices: List, clusterService: ClusterService, resolver: IndexNameExpressionResolver): List { + val result = mutableListOf() + + indices.forEach { index -> + val concreteIndices = resolver.concreteIndexNames( + clusterService.state(), + IndicesOptions.lenientExpand(), + true, + index + ) + result.addAll(concreteIndices) + } + + if (result.size == 0) { + throw IndexNotFoundException(indices[0]) + } + + return result + } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index b1b8c4d0d..6bd67a38f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -458,14 +458,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { var findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue( - "Findings saved for test monitor expected 1 instead of ${findings[0].relatedDocIds}", - findings[0].relatedDocIds.contains("1") - ) - assertTrue( - "Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}", - findings[1].relatedDocIds.contains("5") - ) + + var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } + assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size) // clear previous findings and alerts deleteIndex(ALL_FINDING_INDEX_PATTERN) @@ -493,18 +488,11 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 3, findings.size) - assertTrue( - "Findings saved for test monitor expected 14 instead of ${findings[0].relatedDocIds}", - findings[0].relatedDocIds.contains("14") - ) - assertTrue( - "Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}", - findings[1].relatedDocIds.contains("51") - ) - assertTrue( - "Findings saved for test monitor expected 10 instead of ${findings[2].relatedDocIds}", - findings[2].relatedDocIds.contains("10") - ) + + foundFindings = findings.filter { + it.relatedDocIds.contains("14") || it.relatedDocIds.contains("51") || it.relatedDocIds.contains("10") + } + assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size) } fun `test document-level monitor when alias only has write index with 0 docs`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 3e9540af1..2895750e9 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -7,7 +7,10 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.indices.alias.Alias +import org.opensearch.action.admin.indices.close.CloseIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest @@ -589,6 +592,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size) } + fun `test cleanup monitor on partial create monitor failure`() { + val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customQueryIndex = "custom_alerts_index" + val analyzer = "dfbdfbafd" + val testDoc = """{ + "rule": {"title": "some_title"}, + "message": "msg 1 2 3 4" + }""" + indexDoc(index, "2", testDoc) + client().admin().indices() + .create( + CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex)) + .mapping( + """ + { + "_meta": { + "schema_version": 1 + }, + "properties": { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + }, + "index": { + "type": "text" + } + } + } + """.trimIndent() + ) + ).get() + + client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get() + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), + ) + ) + try { + createMonitor(monitor) + fail("monitor creation should fail due to incorrect analyzer name in test setup") + } catch (e: Exception) { + Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0) + } + } + fun `test execute monitor without create when no monitors exists`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) @@ -750,6 +806,178 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertTrue(indices.isNotEmpty()) } + fun `test execute monitor with multiple indices in input success`() { + + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1, testSourceIndex2), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern) + ) + val monitorResponse = createMonitor(monitor) + client().admin().indices().refresh(RefreshRequest("*")) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + indexDoc(testSourceIndex1, "1", testDoc) + indexDoc(testSourceIndex2, "1", testDoc) + + val id = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + var findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 2, findings.size) + var foundFindings = findings.filter { it.relatedDocIds.contains("1") } + assertEquals("Didn't find 2 findings", 2, foundFindings.size) + + indexDoc(testSourceIndex1, "2", testDoc) + indexDoc(testSourceIndex2, "2", testDoc) + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 4, findings.size) + foundFindings = findings.filter { it.relatedDocIds.contains("2") } + assertEquals("Didn't find 2 findings", 2, foundFindings.size) + + val indices = getAllIndicesFromPattern("custom_findings_index*") + Assert.assertTrue(indices.isNotEmpty()) + } + + fun `test execute monitor with multiple indices in input first index gets deleted`() { + // Index #1 does not exist + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1, testSourceIndex2), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern) + ) + val monitorResponse = createMonitor(monitor) + client().admin().indices().refresh(RefreshRequest("*")) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + indexDoc(testSourceIndex2, "1", testDoc) + + client().admin().indices().delete(DeleteIndexRequest(testSourceIndex1)).get() + + val id = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + var findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 1, findings.size) + var foundFindings = findings.filter { it.relatedDocIds.contains("1") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + indexDoc(testSourceIndex2, "2", testDoc) + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 2, findings.size) + foundFindings = findings.filter { it.relatedDocIds.contains("2") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + val indices = getAllIndicesFromPattern("custom_findings_index*") + Assert.assertTrue(indices.isNotEmpty()) + } + + fun `test execute monitor with multiple indices in input second index gets deleted`() { + // Second index does not exist + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1, testSourceIndex2), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern) + ) + val monitorResponse = createMonitor(monitor) + client().admin().indices().refresh(RefreshRequest("*")) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + indexDoc(testSourceIndex1, "1", testDoc) + + client().admin().indices().delete(DeleteIndexRequest(testSourceIndex2)).get() + + val id = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + var findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 1, findings.size) + var foundFindings = findings.filter { it.relatedDocIds.contains("1") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + indexDoc(testSourceIndex1, "2", testDoc) + + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 2, findings.size) + foundFindings = findings.filter { it.relatedDocIds.contains("2") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + val indices = getAllIndicesFromPattern("custom_findings_index*") + Assert.assertTrue(indices.isNotEmpty()) + } + fun `test execute pre-existing monitor and update`() { val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings()) .settings(Settings.builder().put("index.hidden", true).build()) @@ -880,6 +1108,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1) } + fun `test execute monitor with empty source index`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + val monitorId = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, monitorId, false) + + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + refreshIndex(customFindingsIndex) + + var findings = searchFindings(monitorId, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + + indexDoc(index, "1", testDoc) + + executeMonitor(monitor, monitorId, false) + + refreshIndex(customFindingsIndex) + + findings = searchFindings(monitorId, customFindingsIndex) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + } + fun `test execute GetFindingsAction with monitorId param`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt index 4b6f18f50..f9a3e65e4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting.resthandler import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader import org.apache.http.nio.entity.NStringEntity import org.junit.After @@ -34,17 +35,20 @@ import org.opensearch.alerting.assertUserNull import org.opensearch.alerting.makeRequest import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomAlert +import org.opensearch.alerting.randomDocumentLevelMonitor import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomTemplateScript import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.authuser.User import org.opensearch.commons.rest.SecureRestClientBuilder @@ -1451,4 +1455,66 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { } } */ + + /** + * We want to verify that user roles/permissions do not affect clean up of monitors during partial monitor creation failure + */ + fun `test create monitor failure clean up with a user without delete monitor access`() { + enableFilterBy() + createUser(user, user, listOf(TEST_HR_BACKEND_ROLE, "role2").toTypedArray()) + createTestIndex(TEST_HR_INDEX) + createCustomIndexRole( + ALERTING_INDEX_MONITOR_ACCESS, + TEST_HR_INDEX, + getClusterPermissionsFromCustomRole(ALERTING_INDEX_MONITOR_ACCESS) + ) + createUserWithRoles( + user, + listOf(ALERTING_INDEX_MONITOR_ACCESS, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + val docLevelQueryIndex = ".opensearch-alerting-queries-000001" + createIndex( + docLevelQueryIndex, Settings.EMPTY, + """ + "properties" : { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + }, + "index": { + "type": "text" + } + } + } + """.trimIndent(), + ".opensearch-alerting-queries" + ) + closeIndex(docLevelQueryIndex) // close index to simulate doc level query indexing failure + try { + val monitor = randomDocumentLevelMonitor( + withMetadata = false, + triggers = listOf(), + inputs = listOf(DocLevelMonitorInput("description", listOf(TEST_HR_INDEX), emptyList())) + ) + userClient?.makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) + fail("Monitor creation should have failed due to error in indexing doc level queries") + } catch (e: ResponseException) { + val search = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(10).toString() + val searchResponse = client().makeRequest( + "GET", "$ALERTING_BASE_URI/_search", + emptyMap(), + StringEntity(search, ContentType.APPLICATION_JSON) + ) + val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) + val hits = xcp.map()["hits"]!! as Map> + val numberDocsFound = hits["total"]?.get("value") + assertEquals("Monitors found. Clean up unsuccessful", 0, numberDocsFound) + } finally { + deleteRoleAndRoleMapping(ALERTING_INDEX_MONITOR_ACCESS) + } + } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt index bf30957d7..34693f37b 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt @@ -18,7 +18,6 @@ import org.opensearch.action.search.ShardSearchFailure import org.opensearch.client.OpenSearchClient import org.opensearch.common.settings.Settings import org.opensearch.common.util.concurrent.ThreadContext -import org.opensearch.common.util.concurrent.ThreadContext.StoredContext import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType @@ -171,28 +170,13 @@ suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPl }) } -/** - * Store a [ThreadContext] and restore a [ThreadContext] when the coroutine resumes on a different thread. - * - * @param threadContext - a [ThreadContext] instance - */ -class ElasticThreadContextElement(private val threadContext: ThreadContext) : ThreadContextElement { - - companion object Key : CoroutineContext.Key - private var context: StoredContext = threadContext.newStoredContext(true) - - override val key: CoroutineContext.Key<*> - get() = Key - - override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) { - this.context = threadContext.stashContext() - } - - override fun updateThreadContext(context: CoroutineContext) = this.context.close() -} - -class InjectorContextElement(id: String, settings: Settings, threadContext: ThreadContext, private val roles: List?) : - ThreadContextElement { +class InjectorContextElement( + id: String, + settings: Settings, + threadContext: ThreadContext, + private val roles: List?, + private val user: User? = null +) : ThreadContextElement { companion object Key : CoroutineContext.Key override val key: CoroutineContext.Key<*> @@ -202,6 +186,8 @@ class InjectorContextElement(id: String, settings: Settings, threadContext: Thre override fun updateThreadContext(context: CoroutineContext) { rolesInjectorHelper.injectRoles(roles) + // This is from where plugins extract backend roles. It should be passed when calling APIs of other plugins + rolesInjectorHelper.injectUserInfo(user) } override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {