diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 655bdea6b..597bee2fb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -25,10 +25,12 @@ import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.AggregationResultBucket import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger +import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.model.Trigger import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.getBucketKeysHash @@ -166,6 +168,29 @@ class AlertService( } } + // TODO: clean this up so it follows the proper alert management for doc monitors + fun composeDocLevelAlert( + findings: List, + relatedDocIds: List, + ctx: DocumentLevelTriggerExecutionContext, + result: DocumentLevelTriggerRunResult, + alertError: AlertError? + ): Alert { + val currentTime = Instant.now() + + val actionExecutionResults = result.actionResults.map { + ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) + } + + val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR + return Alert( + monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime, + lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message, + actionExecutionResults = actionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion, + findingIds = findings, relatedDocIds = relatedDocIds + ) + } + fun updateActionResultsForBucketLevelAlert( currentAlert: Alert, actionResults: Map, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 15d081f7e..8c2417658 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -32,6 +32,7 @@ import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction import org.opensearch.alerting.core.model.ClusterMetricsInput +import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler @@ -42,7 +43,6 @@ import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.DocumentLevelTrigger import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTrigger -import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestDeleteDestinationAction import org.opensearch.alerting.resthandler.RestDeleteEmailAccountAction diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 19d59cb72..7d69d1fe7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -30,8 +30,13 @@ import java.time.Instant object BucketLevelMonitorRunner : MonitorRunner { private val logger = LogManager.getLogger(javaClass) - override suspend fun runMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): - MonitorRunResult { + override suspend fun runMonitor( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean + ): MonitorRunResult { val roles = MonitorRunnerService.getRolesForMonitor(monitor) logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") @@ -82,7 +87,12 @@ object BucketLevelMonitorRunner : MonitorRunner { // in the final output of monitorResult which occurs when all pages have been exhausted. // If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths // with different page counts. - val inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd, monitorResult.inputResults) + val inputResults = monitorCtx.inputService!!.collectInputResults( + monitor, + periodStart, + periodEnd, + monitorResult.inputResults + ) if (firstIteration) { firstPageOfInputResults = inputResults firstIteration = false @@ -149,7 +159,8 @@ object BucketLevelMonitorRunner : MonitorRunner { // in favor of just using the currentAlerts as-is. currentAlerts.forEach { (trigger, keysToAlertsMap) -> if (triggerResults[trigger.id]?.error == null) - nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap)) + nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) + ?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap)) } for (trigger in monitor.triggers) { @@ -262,24 +273,39 @@ object BucketLevelMonitorRunner : MonitorRunner { if (!dryrun && monitor.id != Monitor.NO_ID) { monitorCtx.alertService!!.saveAlerts(updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false) // Save any COMPLETED Alerts that were not covered in updatedAlerts - monitorCtx.alertService!!.saveAlerts(completedAlertsToUpdate.toList(), monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false) + monitorCtx.alertService!!.saveAlerts( + completedAlertsToUpdate.toList(), + monitorCtx.retryPolicy!!, + allowUpdatingAcknowledgedAlert = false + ) } } return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults) } - override suspend fun runAction(action: Action, ctx: TriggerExecutionContext, monitorCtx: MonitorRunnerExecutionContext, dryrun: Boolean): ActionRunResult { + override suspend fun runAction( + action: Action, + ctx: TriggerExecutionContext, + monitorCtx: MonitorRunnerExecutionContext, + dryrun: Boolean + ): ActionRunResult { return try { val actionOutput = mutableMapOf() - actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx) else "" + actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) + MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx) + else "" actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplate(action.messageTemplate, ctx) if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) { throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") } if (!dryrun) { withContext(Dispatchers.IO) { - val destination = AlertingConfigAccessor.getDestinationInfo(monitorCtx.client!!, monitorCtx.xContentRegistry!!, action.destinationId) + val destination = AlertingConfigAccessor.getDestinationInfo( + monitorCtx.client!!, + monitorCtx.xContentRegistry!!, + action.destinationId + ) if (!destination.isAllowed(monitorCtx.allowList)) { throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}") } @@ -323,8 +349,8 @@ object BucketLevelMonitorRunner : MonitorRunner { if (totalActionableAlertCount > monitorCtx.maxActionableAlertCount) { logger.debug( "The total actionable alerts for trigger [$triggerId] in monitor [$monitorId] is [$totalActionableAlertCount] " + - "which exceeds the maximum of [$(monitorCtx.maxActionableAlertCount)]. Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] " + - "for action execution." + "which exceeds the maximum of [${monitorCtx.maxActionableAlertCount}]. " + + "Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] for action execution." ) return true } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index 989b7ae79..206d086b4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -1,25 +1,35 @@ package org.opensearch.alerting +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.AlertingConfigAccessor import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTrigger import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.Finding +import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.action.Action -import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput -import org.opensearch.alerting.model.docLevelInput.DocLevelQuery import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.isAllowed import org.opensearch.alerting.util.updateMonitor +import org.opensearch.client.Client import org.opensearch.cluster.routing.ShardRouting +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.Strings import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentType @@ -33,14 +43,20 @@ import java.io.IOException import java.time.Instant import java.util.UUID import kotlin.collections.HashMap +import kotlin.math.max object DocumentReturningMonitorRunner : MonitorRunner { private val logger = LogManager.getLogger(javaClass) - override suspend fun runMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): - MonitorRunResult { + override suspend fun runMonitor( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean + ): MonitorRunResult { logger.info("Document-level-monitor is running ...") - val monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) try { validate(monitor) } catch (e: Exception) { @@ -52,10 +68,11 @@ object DocumentReturningMonitorRunner : MonitorRunner { val index = docLevelMonitorInput.indices[0] val queries: List = docLevelMonitorInput.queries + val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var lastRunContext = monitor.lastRunContext.toMutableMap() try { if (lastRunContext.isNullOrEmpty()) { - lastRunContext = createRunContext(monitorCtx, index).toMutableMap() + lastRunContext = createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, index).toMutableMap() } } catch (e: Exception) { logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") @@ -66,13 +83,20 @@ object DocumentReturningMonitorRunner : MonitorRunner { val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx, index, shard) - updatedLastRunContext[shard] = maxSeqNo.toString() + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) + updatedLastRunContext[shard] = maxSeqNo + + // update lastRunContext if its a temp monitor as we only want to view the last bit of data then + // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data + if (isTempMonitor) { + lastRunContext[shard] = max(-1, maxSeqNo - 1) + } } val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) + val idQueryMap = mutableMapOf() queries.forEach { query -> val matchingDocIds = runForEachQuery(monitorCtx, docExecutionContext, query, index) queryToDocIds[query] = matchingDocIds @@ -80,16 +104,28 @@ object DocumentReturningMonitorRunner : MonitorRunner { docsToQueries.putIfAbsent(it, mutableListOf()) docsToQueries[it]?.add(query.id) } + idQueryMap[query.id] = query } - + val queryInputResults = queryToDocIds.mapKeys { it.key.id } + monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults))) val queryIds = queries.map { it.id } + val triggerResults = mutableMapOf() monitor.triggers.forEach { - runForEachDocTrigger(monitorCtx, it as DocumentLevelTrigger, monitor, docsToQueries, queryIds, queryToDocIds, dryrun) + triggerResults[it.id] = runForEachDocTrigger( + monitorCtx, + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryIds, + dryrun + ) } - // Don't save alerts if this is a test monitor - if (!dryrun && monitor.id != Monitor.NO_ID) { + // Don't update monitor if this is a test monitor + if (!isTempMonitor) { // TODO: Check for race condition against the update monitor api // This does the update at the end in case of errors and makes sure all the queries are executed @@ -100,19 +136,19 @@ object DocumentReturningMonitorRunner : MonitorRunner { } // TODO: Update the Document as part of the Trigger and return back the trigger action result - val triggerResults = mutableMapOf() return monitorResult.copy(triggerResults = triggerResults) } - private fun runForEachDocTrigger( + private suspend fun runForEachDocTrigger( monitorCtx: MonitorRunnerExecutionContext, + monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, monitor: Monitor, + idQueryMap: Map, docsToQueries: Map>, queryIds: List, - queryToDocIds: Map>, dryrun: Boolean - ) { + ): DocumentLevelTriggerRunResult { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds) @@ -121,23 +157,58 @@ object DocumentReturningMonitorRunner : MonitorRunner { val index = (monitor.inputs[0] as DocLevelMonitorInput).indices[0] - queryToDocIds.forEach { - val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs) - // TODO: Update finding only if it is not dry run, else return the findings - if (queryTriggeredDocs.isNotEmpty() && !dryrun && monitor.id != Monitor.NO_ID) { - val findingId = createFindings(monitor, monitorCtx, index, it.key, queryTriggeredDocs, trigger) - // TODO: check if need to create alert, if so create it and point it to FindingId - // TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting + // TODO: modify findings such that there is a finding per document + val findings = mutableListOf() + val findingDocPairs = mutableListOf>() + + // TODO: Implement throttling for findings + if (!dryrun && monitor.id != Monitor.NO_ID) { + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } + val findingId = createFindings(monitor, monitorCtx, index, triggeredQueries, listOf(it.key), trigger) + findings.add(findingId) + + if (triggerResult.triggeredDocs.contains(it.key)) { + findingDocPairs.add(Pair(findingId, it.key)) + } + } + } + + val actionCtx = triggerCtx.copy( + triggeredDocs = triggerResult.triggeredDocs, + relatedFindings = findings, + error = monitorResult.error ?: triggerResult.error + ) + + for (action in trigger.actions) { + triggerResult.actionResults[action.id] = this.runAction(action, actionCtx, monitorCtx, dryrun) + } + + // TODO: Implement throttling for alerts + // Alerts are saved after the actions since if there are failures in the actions, they can be stated in the alert + if (!dryrun && monitor.id != Monitor.NO_ID) { + val alerts = mutableListOf() + findingDocPairs.forEach { + val alert = monitorCtx.alertService!!.composeDocLevelAlert( + listOf(it.first), + listOf(it.second), + triggerCtx, + triggerResult, + monitorResult.alertError() ?: triggerResult.alertError() + ) + alerts.add(alert) } + monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(alerts, it) } } + return triggerResult } - fun createFindings( + private fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, index: String, - docLevelQuery: DocLevelQuery, - matchingDocIds: Set, + docLevelQueries: List, + matchingDocIds: List, trigger: DocumentLevelTrigger ): String { val finding = Finding( @@ -146,9 +217,7 @@ object DocumentReturningMonitorRunner : MonitorRunner { monitorId = monitor.id, monitorName = monitor.name, index = index, - queryId = docLevelQuery.id, - queryTags = docLevelQuery.tags, - severity = docLevelQuery.severity, + docLevelQueries = docLevelQueries, timestamp = Instant.now(), triggerId = trigger.id, triggerName = trigger.name @@ -168,8 +237,49 @@ object DocumentReturningMonitorRunner : MonitorRunner { } // TODO: Implement action for triggers - override suspend fun runAction(action: Action, ctx: TriggerExecutionContext, monitorCtx: MonitorRunnerExecutionContext, dryrun: Boolean): ActionRunResult { - return ActionRunResult(action.id, action.name, mapOf(), false, MonitorRunnerService.currentTime(), null) + override suspend fun runAction( + action: Action, + ctx: TriggerExecutionContext, + monitorCtx: MonitorRunnerExecutionContext, + dryrun: Boolean + ): ActionRunResult { + return try { + if (!MonitorRunnerService.isActionActionable(action, (ctx as DocumentLevelTriggerExecutionContext).alert)) { + return ActionRunResult(action.id, action.name, mapOf(), true, null, null) + } + val actionOutput = mutableMapOf() + actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) + MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx) + else "" + actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplate(action.messageTemplate, ctx) + if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) { + throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") + } + if (!dryrun) { + withContext(Dispatchers.IO) { + val destination = AlertingConfigAccessor.getDestinationInfo( + monitorCtx.client!!, + monitorCtx.xContentRegistry!!, + action.destinationId + ) + if (!destination.isAllowed(monitorCtx.allowList)) { + throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}") + } + + val destinationCtx = monitorCtx.destinationContextFactory!!.getDestinationContext(destination) + actionOutput[Action.MESSAGE_ID] = destination.publish( + actionOutput[Action.SUBJECT], + actionOutput[Action.MESSAGE]!!, + destinationCtx, + monitorCtx.hostDenyList + ) + } + } + ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null) + } catch (e: Exception) { + logger.debug("Failed to run action", AlertingException.wrap(e)) + ActionRunResult(action.id, action.name, mapOf(), false, MonitorRunnerService.currentTime(), e) + } } private fun validate(monitor: Monitor) { @@ -187,15 +297,15 @@ object DocumentReturningMonitorRunner : MonitorRunner { } } - private fun createRunContext(monitorCtx: MonitorRunnerExecutionContext, index: String): HashMap { + fun createRunContext(clusterService: ClusterService, client: Client, index: String): HashMap { val lastRunContext = HashMap() lastRunContext["index"] = index - val count = getShardsCount(monitorCtx, index) + val count = getShardsCount(clusterService, index) lastRunContext["shards_count"] = count for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx, index, shard) + val maxSeqNo: Long = getMaxSeqNo(client, index, shard) lastRunContext[shard] = maxSeqNo } return lastRunContext @@ -205,7 +315,7 @@ object DocumentReturningMonitorRunner : MonitorRunner { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private fun getMaxSeqNo(monitorCtx: MonitorRunnerExecutionContext, index: String, shard: String): Long { + private fun getMaxSeqNo(client: Client, index: String, shard: String): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -217,7 +327,7 @@ object DocumentReturningMonitorRunner : MonitorRunner { .query(QueryBuilders.matchAllQuery()) .size(1) ) - val response: SearchResponse = monitorCtx.client!!.search(request).actionGet() + val response: SearchResponse = client.search(request).actionGet() if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } @@ -227,8 +337,8 @@ object DocumentReturningMonitorRunner : MonitorRunner { return response.hits.hits[0].seqNo } - private fun getShardsCount(monitorCtx: MonitorRunnerExecutionContext, index: String): Int { - val allShards: List = monitorCtx.clusterService!!.state().routingTable().allShards(index) + private fun getShardsCount(clusterService: ClusterService, index: String): Int { + val allShards: List = clusterService!!.state().routingTable().allShards(index) return allShards.filter { it.primary() }.size } @@ -245,14 +355,15 @@ object DocumentReturningMonitorRunner : MonitorRunner { try { logger.info("Monitor execution for shard: $shard") - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") + val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val maxSeqNo = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() + logger.info("Shard_$shard has MaxSeqNo: $maxSeqNo and PrevSeqNo: $prevSeqNo") val hits: SearchHits = searchShard( monitorCtx, index, shard, - docExecutionCtx.lastRunContext[shard].toString().toLongOrNull(), + prevSeqNo, maxSeqNo, query.query ) @@ -270,8 +381,15 @@ object DocumentReturningMonitorRunner : MonitorRunner { return matchingDocs } - private fun searchShard(monitorCtx: MonitorRunnerExecutionContext, index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits { - if (prevSeqNo?.equals(maxSeqNo) == true) { + private fun searchShard( + monitorCtx: MonitorRunnerExecutionContext, + index: String, + shard: String, + prevSeqNo: Long?, + maxSeqNo: Long, + query: String + ): SearchHits { + if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() } val boolQueryBuilder = BoolQueryBuilder() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 4b6f1a896..964b2384c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -107,14 +107,21 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon // Must be called after registerClusterService and registerSettings in AlertingPlugin fun registerConsumers(): MonitorRunnerService { - monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(ALERT_BACKOFF_MILLIS.get(monitorCtx.settings), ALERT_BACKOFF_COUNT.get(monitorCtx.settings)) + monitorCtx.retryPolicy = BackoffPolicy.constantBackoff( + ALERT_BACKOFF_MILLIS.get(monitorCtx.settings), + ALERT_BACKOFF_COUNT.get(monitorCtx.settings) + ) monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count -> monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count) } monitorCtx.moveAlertsRetryPolicy = - BackoffPolicy.exponentialBackoff(MOVE_ALERTS_BACKOFF_MILLIS.get(monitorCtx.settings), MOVE_ALERTS_BACKOFF_COUNT.get(monitorCtx.settings)) - monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) { millis, count -> + BackoffPolicy.exponentialBackoff( + MOVE_ALERTS_BACKOFF_MILLIS.get(monitorCtx.settings), + MOVE_ALERTS_BACKOFF_COUNT.get(monitorCtx.settings) + ) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) { + millis, count -> monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) } @@ -137,7 +144,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon // To be safe, call this last as it depends on a number of other components being registered beforehand (client, settings, etc.) fun registerDestinationSettings(): MonitorRunnerService { monitorCtx.destinationSettings = loadDestinationSettings(monitorCtx.settings!!) - monitorCtx.destinationContextFactory = DestinationContextFactory(monitorCtx.client!!, monitorCtx.xContentRegistry!!, monitorCtx.destinationSettings!!) + monitorCtx.destinationContextFactory = + DestinationContextFactory(monitorCtx.client!!, monitorCtx.xContentRegistry!!, monitorCtx.destinationSettings!!) return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index 550e1cfd7..3a2eb219b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -23,8 +23,13 @@ import java.time.Instant object QueryLevelMonitorRunner : MonitorRunner { private val logger = LogManager.getLogger(javaClass) - override suspend fun runMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): - MonitorRunResult { + override suspend fun runMonitor( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean + ): MonitorRunResult { val roles = MonitorRunnerService.getRolesForMonitor(monitor) logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") @@ -45,10 +50,14 @@ object QueryLevelMonitorRunner : MonitorRunner { } if (!isADMonitor(monitor)) { runBlocking(InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles)) { - monitorResult = monitorResult.copy(inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd)) + monitorResult = monitorResult.copy( + inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd) + ) } } else { - monitorResult = monitorResult.copy(inputResults = monitorCtx.inputService!!.collectInputResultsForADMonitor(monitor, periodStart, periodEnd)) + monitorResult = monitorResult.copy( + inputResults = monitorCtx.inputService!!.collectInputResultsForADMonitor(monitor, periodStart, periodEnd) + ) } val updatedAlerts = mutableListOf() @@ -80,20 +89,31 @@ object QueryLevelMonitorRunner : MonitorRunner { return monitorResult.copy(triggerResults = triggerResults) } - override suspend fun runAction(action: Action, ctx: TriggerExecutionContext, monitorCtx: MonitorRunnerExecutionContext, dryrun: Boolean): ActionRunResult { + override suspend fun runAction( + action: Action, + ctx: TriggerExecutionContext, + monitorCtx: MonitorRunnerExecutionContext, + dryrun: Boolean + ): ActionRunResult { return try { if (!MonitorRunnerService.isActionActionable(action, (ctx as QueryLevelTriggerExecutionContext).alert)) { return ActionRunResult(action.id, action.name, mapOf(), true, null, null) } val actionOutput = mutableMapOf() - actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx) else "" + actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) + MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx) + else "" actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplate(action.messageTemplate, ctx) if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) { throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") } if (!dryrun) { withContext(Dispatchers.IO) { - val destination = AlertingConfigAccessor.getDestinationInfo(monitorCtx.client!!, monitorCtx.xContentRegistry!!, action.destinationId) + val destination = AlertingConfigAccessor.getDestinationInfo( + monitorCtx.client!!, + monitorCtx.xContentRegistry!!, + action.destinationId + ) if (!destination.isAllowed(monitorCtx.allowList)) { throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}") } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt index 6fffd0032..80be2fee5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt @@ -32,6 +32,8 @@ data class Alert( val monitorUser: User?, val triggerId: String, val triggerName: String, + val findingIds: List, + val relatedDocIds: List, val state: State, val startTime: Instant, val endTime: Instant? = null, @@ -65,7 +67,7 @@ data class Alert( triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null + aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList() ) constructor( @@ -83,7 +85,7 @@ data class Alert( triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null + aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList() ) constructor( @@ -102,7 +104,27 @@ data class Alert( triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = aggregationResultBucket + aggregationResultBucket = aggregationResultBucket, findingIds = emptyList(), relatedDocIds = emptyList() + ) + + constructor( + monitor: Monitor, + trigger: DocumentLevelTrigger, + findingIds: List, + relatedDocIds: List, + startTime: Instant, + lastNotificationTime: Instant?, + state: State = State.ACTIVE, + errorMessage: String? = null, + errorHistory: List = mutableListOf(), + actionExecutionResults: List = mutableListOf(), + schemaVersion: Int = NO_SCHEMA_VERSION + ) : this( + monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, + triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, + lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, + severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, + aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds ) enum class State { @@ -122,6 +144,8 @@ data class Alert( } else null, triggerId = sin.readString(), triggerName = sin.readString(), + findingIds = sin.readStringList(), + relatedDocIds = sin.readStringList(), state = sin.readEnum(State::class.java), startTime = sin.readInstant(), endTime = sin.readOptionalInstant(), @@ -148,6 +172,8 @@ data class Alert( monitorUser?.writeTo(out) out.writeString(triggerId) out.writeString(triggerName) + out.writeStringCollection(findingIds) + out.writeStringCollection(relatedDocIds) out.writeEnum(state) out.writeInstant(startTime) out.writeOptionalInstant(endTime) @@ -176,6 +202,8 @@ data class Alert( const val MONITOR_USER_FIELD = "monitor_user" const val TRIGGER_ID_FIELD = "trigger_id" const val TRIGGER_NAME_FIELD = "trigger_name" + const val FINDING_IDS = "finding_ids" + const val RELATED_DOC_IDS = "related_doc_ids" const val STATE_FIELD = "state" const val START_TIME_FIELD = "start_time" const val LAST_NOTIFICATION_TIME_FIELD = "last_notification_time" @@ -201,6 +229,8 @@ data class Alert( var monitorUser: User? = null lateinit var triggerId: String lateinit var triggerName: String + val findingIds = mutableListOf() + val relatedDocIds = mutableListOf() lateinit var state: State lateinit var startTime: Instant lateinit var severity: String @@ -223,6 +253,18 @@ data class Alert( MONITOR_VERSION_FIELD -> monitorVersion = xcp.longValue() MONITOR_USER_FIELD -> monitorUser = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) TRIGGER_ID_FIELD -> triggerId = xcp.text() + FINDING_IDS -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + findingIds.add(xcp.text()) + } + } + RELATED_DOC_IDS -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + relatedDocIds.add(xcp.text()) + } + } STATE_FIELD -> state = State.valueOf(xcp.text()) TRIGGER_NAME_FIELD -> triggerName = xcp.text() START_TIME_FIELD -> startTime = requireNotNull(xcp.instant()) @@ -264,7 +306,8 @@ data class Alert( state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime, lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = severity, - actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket + actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket, findingIds = findingIds, + relatedDocIds = relatedDocIds ) } @@ -297,6 +340,8 @@ data class Alert( builder.field(TRIGGER_ID_FIELD, triggerId) .field(TRIGGER_NAME_FIELD, triggerName) + .field(FINDING_IDS, findingIds.toTypedArray()) + .field(RELATED_DOC_IDS, relatedDocIds.toTypedArray()) .field(STATE_FIELD, state) .field(ERROR_MESSAGE_FIELD, errorMessage) .field(ALERT_HISTORY_FIELD, errorHistory.toTypedArray()) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt index db3f752c7..e627dcf92 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt @@ -1,6 +1,6 @@ package org.opensearch.alerting.model -import org.opensearch.alerting.model.docLevelInput.DocLevelQuery +import org.opensearch.alerting.core.model.DocLevelQuery data class DocumentExecutionContext( val queries: List, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt index ccd8689a3..683cff32d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt @@ -5,6 +5,7 @@ package org.opensearch.alerting.model +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.elasticapi.instant import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput @@ -25,9 +26,7 @@ class Finding( val monitorId: String, val monitorName: String, val index: String, - val queryId: String = NO_ID, - val queryTags: List, - val severity: String, + val docLevelQueries: List, val timestamp: Instant, val triggerId: String?, val triggerName: String? @@ -39,9 +38,7 @@ class Finding( monitorId = sin.readString(), monitorName = sin.readString(), index = sin.readString(), - queryId = sin.readString(), - queryTags = sin.readStringList(), - severity = sin.readString(), + docLevelQueries = sin.readList((DocLevelQuery)::readFrom), timestamp = sin.readInstant(), triggerId = sin.readOptionalString(), triggerName = sin.readOptionalString() @@ -54,9 +51,7 @@ class Finding( MONITOR_ID_FIELD to monitorId, MONITOR_NAME_FIELD to monitorName, INDEX_FIELD to index, - QUERY_ID_FIELD to queryId, - QUERY_TAGS_FIELD to queryTags, - SEVERITY_FIELD to severity, + QUERIES_FIELD to docLevelQueries, TIMESTAMP_FIELD to timestamp.toEpochMilli(), TRIGGER_ID_FIELD to triggerId, TRIGGER_NAME_FIELD to triggerName @@ -70,10 +65,8 @@ class Finding( .field(MONITOR_ID_FIELD, monitorId) .field(MONITOR_NAME_FIELD, monitorName) .field(INDEX_FIELD, index) - .field(QUERY_ID_FIELD, queryId) - .field(QUERY_TAGS_FIELD, queryTags.toTypedArray()) - .field(SEVERITY_FIELD, severity) - .field(TIMESTAMP_FIELD, timestamp) + .field(QUERIES_FIELD, docLevelQueries.toTypedArray()) + .field(TIMESTAMP_FIELD, timestamp.toEpochMilli()) .field(TRIGGER_ID_FIELD, triggerId) .field(TRIGGER_NAME_FIELD, triggerName) builder.endObject() @@ -87,9 +80,7 @@ class Finding( out.writeString(monitorId) out.writeString(monitorName) out.writeString(index) - out.writeString(queryId) - out.writeStringCollection(queryTags) - out.writeString(severity) + out.writeCollection(docLevelQueries) out.writeInstant(timestamp) out.writeOptionalString(triggerId) out.writeOptionalString(triggerName) @@ -101,9 +92,7 @@ class Finding( const val MONITOR_ID_FIELD = "monitor_id" const val MONITOR_NAME_FIELD = "monitor_name" const val INDEX_FIELD = "index" - const val QUERY_ID_FIELD = "query_id" - const val QUERY_TAGS_FIELD = "query_tags" - const val SEVERITY_FIELD = "severity" + const val QUERIES_FIELD = "queries" const val TIMESTAMP_FIELD = "timestamp" const val TRIGGER_ID_FIELD = "trigger_id" const val TRIGGER_NAME_FIELD = "trigger_name" @@ -116,9 +105,7 @@ class Finding( lateinit var monitorId: String lateinit var monitorName: String lateinit var index: String - var queryId: String = NO_ID - val queryTags: MutableList = mutableListOf() - lateinit var severity: String + val queries: MutableList = mutableListOf() lateinit var timestamp: Instant lateinit var triggerId: String lateinit var triggerName: String @@ -133,14 +120,12 @@ class Finding( MONITOR_ID_FIELD -> monitorId = xcp.text() MONITOR_NAME_FIELD -> monitorName = xcp.text() INDEX_FIELD -> index = xcp.text() - QUERY_ID_FIELD -> queryId = xcp.text() - QUERY_TAGS_FIELD -> { + QUERIES_FIELD -> { ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { - queryTags.add(xcp.text()) + queries.add(DocLevelQuery.parse(xcp)) } } - SEVERITY_FIELD -> severity = xcp.text() TIMESTAMP_FIELD -> timestamp = requireNotNull(xcp.instant()) TRIGGER_ID_FIELD -> triggerId = xcp.text() TRIGGER_NAME_FIELD -> triggerName = xcp.text() @@ -153,9 +138,7 @@ class Finding( monitorId = monitorId, monitorName = monitorName, index = index, - queryId = queryId, - queryTags = queryTags, - severity = severity, + docLevelQueries = queries, timestamp = timestamp, triggerId = triggerId, triggerName = triggerName diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt index 4d6a41771..3d747804c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt @@ -111,7 +111,7 @@ data class Monitor( User(sin) } else null, schemaVersion = sin.readInt(), - inputs = sin.readList(::SearchInput), + inputs = sin.readList((Input)::readFrom), triggers = sin.readList((Trigger)::readFrom), lastRunContext = suppressWarning(sin.readMap()), uiMetadata = suppressWarning(sin.readMap()) @@ -187,7 +187,13 @@ data class Monitor( out.writeBoolean(user != null) user?.writeTo(out) out.writeInt(schemaVersion) - out.writeCollection(inputs) + // Outputting type with each Input so that the generic Input.readFrom() can read it + out.writeVInt(inputs.size) + inputs.forEach { + if (it is SearchInput) out.writeEnum(Input.Type.SEARCH_INPUT) + else out.writeEnum(Input.Type.DOCUMENT_LEVEL_INPUT) + it.writeTo(out) + } // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it out.writeVInt(triggers.size) triggers.forEach { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt index 318a2c712..e131c047a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt @@ -17,6 +17,8 @@ data class DocumentLevelTriggerExecutionContext( override val periodStart: Instant, override val periodEnd: Instant, val alert: Alert? = null, + val triggeredDocs: List, + val relatedFindings: List, override val error: Exception? = null ) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { @@ -26,7 +28,7 @@ data class DocumentLevelTriggerExecutionContext( alert: Alert? = null ) : this( monitor, trigger, emptyList(), Instant.now(), Instant.now(), - alert, null + alert, emptyList(), emptyList(), null ) /** diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 3a6f10a54..3575d5792 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -19,10 +19,12 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.DocumentReturningMonitorRunner import org.opensearch.alerting.action.IndexMonitorAction import org.opensearch.alerting.action.IndexMonitorRequest import org.opensearch.alerting.action.IndexMonitorResponse import org.opensearch.alerting.core.ScheduledJobIndices +import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.alerting.core.model.SearchInput @@ -370,6 +372,11 @@ class TransportIndexMonitorAction @Inject constructor( } private fun indexMonitor() { + if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + val monitorIndex = (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] + val lastRunContext = DocumentReturningMonitorRunner.createRunContext(clusterService, client, monitorIndex).toMutableMap() + request.monitor = request.monitor.copy(lastRunContext = lastRunContext) + } request.monitor = request.monitor.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(request.refreshPolicy) @@ -435,6 +442,15 @@ class TransportIndexMonitorAction @Inject constructor( return } + if ( + request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && + request.monitor.lastRunContext.toMutableMap().isNullOrEmpty() + ) { + val monitorIndex = (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] + val lastRunContext = DocumentReturningMonitorRunner.createRunContext(clusterService, client, monitorIndex).toMutableMap() + request.monitor = request.monitor.copy(lastRunContext = lastRunContext) + } + // If both are enabled, use the current existing monitor enabled time, otherwise the next execution will be // incorrect. if (request.monitor.enabled && currentMonitor.enabled) diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index 4ebba38fa..abb377b6c 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -83,6 +83,22 @@ } } }, + "finding_ids": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "related_doc_ids": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, "state": { "type": "keyword" }, diff --git a/alerting/src/main/resources/org/opensearch/alerting/findings/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/findings/finding_mapping.json new file mode 100644 index 000000000..aeb9d324e --- /dev/null +++ b/alerting/src/main/resources/org/opensearch/alerting/findings/finding_mapping.json @@ -0,0 +1,88 @@ +{ + "dynamic": "strict", + "_routing": { + "required": true + }, + "_meta" : { + "schema_version": 1 + }, + "properties": { + "schema_version": { + "type": "integer" + }, + "related_doc_ids": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "monitor_id": { + "type": "keyword" + }, + "monitor_name": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "index": { + "type": "keyword" + }, + "query_id": { + "type": "keyword" + }, + "query_tags": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "severity": { + "type": "keyword" + }, + "timestamp": { + "type": "date" + }, + "monitor_user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + } + } +} \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 26d00841b..52cfb5c12 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -24,6 +24,7 @@ import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.DocumentLevelTrigger +import org.opensearch.alerting.model.Finding import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.model.destination.Destination @@ -101,7 +102,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { response.entity.content ).map() assertUserNull(monitorJson as HashMap) - return monitor.copy(id = monitorJson["_id"] as String, version = (monitorJson["_version"] as Int).toLong()) + + return getMonitor(monitorId = monitorJson["_id"] as String) } protected fun createMonitor(monitor: Monitor, refresh: Boolean = true): Monitor { @@ -435,7 +437,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } protected fun createRandomDocumentMonitor(refresh: Boolean = false, withMetadata: Boolean = false): Monitor { - val monitor = randomDocumentLevelMonitor(withMetadata = withMetadata) + val monitor = randomDocumentReturningMonitor(withMetadata = withMetadata) val monitorId = createMonitor(monitor, refresh).id if (withMetadata) { return getMonitor(monitorId = monitorId, header = BasicHeader(HttpHeaders.USER_AGENT, "OpenSearch-Dashboards")) @@ -479,6 +481,51 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return monitor.copy(id = id, version = version) } + // TODO: understand why doc alerts wont work with the normal search Alerts function + protected fun searchAlertsWithFilter( + monitor: Monitor, + indices: String = AlertIndices.ALERT_INDEX, + refresh: Boolean = true + ): List { + if (refresh) refreshIndex(indices) + + val request = """ + { "version" : true, + "query": { "match_all": {} } + } + """.trimIndent() + val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON)) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + return searchResponse.hits.hits.map { + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + Alert.parse(xcp, it.id, it.version) + }.filter { alert -> alert.monitorId == monitor.id } + } + + protected fun searchFindings( + monitor: Monitor, + indices: String = ".opensearch-alerting-findings", + refresh: Boolean = true + ): List { + if (refresh) refreshIndex(indices) + + val request = """ + { "version" : true, + "query": { "match_all": {} } + } + """.trimIndent() + val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON)) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + return searchResponse.hits.hits.map { + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + Finding.parse(xcp, it.id) + }.filter { finding -> finding.monitorId == monitor.id } + } + protected fun searchAlerts(monitor: Monitor, indices: String = AlertIndices.ALERT_INDEX, refresh: Boolean = true): List { if (refresh) refreshIndex(indices) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt new file mode 100644 index 000000000..d65cbd527 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -0,0 +1,152 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.alerting.model.Alert +import java.time.Instant +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit.MILLIS + +class DocumentMonitorRunnerIT : AlertingRestTestCase() { + + fun `test execute monitor with dryrun`() { + val alert = Alert( + monitorId = "monitorId", + monitorName = "monitorName", + monitorVersion = 1L, + monitorUser = null, + triggerId = "triggerId", + triggerName = "triggerName", + findingIds = emptyList(), + relatedDocIds = emptyList(), + state = Alert.State.COMPLETED, + startTime = Instant.now(), + errorHistory = emptyList(), + severity = "sev3", + actionExecutionResults = emptyList() + ) + createAlert(alert) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", severity = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentReturningMonitor( + inputs = listOf(docReturningInput), + triggers = listOf(randomDocumentReturningTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (actionResult in triggerResult.objectMap("action_results").values) { + @Suppress("UNCHECKED_CAST") val actionOutput = actionResult["output"] as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test execute monitor returns search result with dryrun`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", severity = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger)) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("5")) + } + + fun `test execute monitor generates alerts and findings`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", severity = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1", "5"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + // TODO: modify findings such that there is a finding per document, so this test will need to be modified + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + assertEquals("Findings saved for test monitor", "1", findings[0].relatedDocId) + assertEquals("Findings saved for test monitor", "5", findings[1].relatedDocId) + } + + @Suppress("UNCHECKED_CAST") + /** helper that returns a field in a json map whose values are all json objects */ + private fun Map.objectMap(key: String): Map> { + return this[key] as Map> + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 736a91189..3fa37d4f1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -11,6 +11,8 @@ import org.apache.http.HttpEntity import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter import org.opensearch.alerting.core.model.ClusterMetricsInput +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.IntervalSchedule import org.opensearch.alerting.core.model.Schedule @@ -41,8 +43,6 @@ import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailEntry import org.opensearch.alerting.model.destination.email.EmailGroup -import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput -import org.opensearch.alerting.model.docLevelInput.DocLevelQuery import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.client.Request import org.opensearch.client.RequestOptions @@ -152,6 +152,24 @@ fun randomClusterMetricsMonitor( ) } +fun randomDocumentReturningMonitor( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + inputs: List = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false +): Monitor { + return Monitor( + name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + lastRunContext = mapOf(), uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + ) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), @@ -189,6 +207,25 @@ fun randomBucketLevelTrigger( fun randomActionsForBucketLevelTrigger(min: Int = 0, max: Int = 10, destinationId: String = ""): List = (min..randomInt(max)).map { randomActionWithPolicy(destinationId = destinationId) } +fun randomDocumentReturningTrigger( + id: String = UUIDs.base64UUID(), + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + severity: String = "1", + condition: Script = randomScript(), + actions: List = mutableListOf(), + destinationId: String = "" +): DocumentLevelTrigger { + return DocumentLevelTrigger( + id = id, + name = name, + severity = severity, + condition = condition, + actions = if (actions.isEmpty() && destinationId.isNotBlank()) + (0..randomInt(10)).map { randomAction(destinationId = destinationId) } + else actions + ) +} + fun randomBucketSelectorExtAggregationBuilder( name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), bucketsPathsMap: MutableMap = mutableMapOf("avg" to "10"), @@ -333,10 +370,9 @@ fun randomDocLevelQuery( id: String = OpenSearchRestTestCase.randomAlphaOfLength(10), query: String = OpenSearchRestTestCase.randomAlphaOfLength(10), severity: String = "${randomInt(5)}", - tags: List = mutableListOf(0..randomInt(10)).map { OpenSearchRestTestCase.randomAlphaOfLength(10) }, - actions: List = mutableListOf(0..randomInt(10)).map { randomAction() } + tags: List = mutableListOf(0..randomInt(10)).map { OpenSearchRestTestCase.randomAlphaOfLength(10) } ): DocLevelQuery { - return DocLevelQuery(id = id, query = query, severity = severity, tags = tags, actions = actions) + return DocLevelQuery(id = id, query = query, severity = severity, tags = tags) } fun randomDocLevelMonitorInput( @@ -353,9 +389,7 @@ fun randomFinding( monitorId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), monitorName: String = OpenSearchRestTestCase.randomAlphaOfLength(10), index: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - queryId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - queryTags: MutableList = mutableListOf(), - severity: String = "${randomInt(5)}", + docLevelQueries: List = listOf(randomDocLevelQuery()), timestamp: Instant = Instant.now(), triggerId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), triggerName: String = OpenSearchRestTestCase.randomAlphaOfLength(10) @@ -366,9 +400,7 @@ fun randomFinding( monitorId = monitorId, monitorName = monitorName, index = index, - queryId = queryId, - queryTags = queryTags, - severity = severity, + docLevelQueries = docLevelQueries, timestamp = timestamp, triggerId = triggerId, triggerName = triggerName @@ -632,21 +664,3 @@ fun assertUserNull(map: Map) { fun assertUserNull(monitor: Monitor) { assertNull("User is not null", monitor.user) } - -fun randomDocumentLevelMonitor( - name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - user: User = randomUser(), - inputs: List = listOf(SearchInput(emptyList(), SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))), - schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), - enabled: Boolean = randomBoolean(), - triggers: List = (1..randomInt(10)).map { randomDocLevelTrigger() }, - enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, - lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false -): Monitor { - return Monitor( - name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, - schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - lastRunContext = mapOf(), uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() - ) -} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt index 72c8198a6..13d98f599 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt @@ -22,7 +22,7 @@ class AcknowledgeAlertResponseTests : OpenSearchTestCase() { val acknowledged = mutableListOf( Alert( "1234", 0L, 1, "monitor-1234", "test-monitor", 0L, randomUser(), - "trigger-14", "test-trigger", Alert.State.ACKNOWLEDGED, + "trigger-14", "test-trigger", ArrayList(), ArrayList(), Alert.State.ACKNOWLEDGED, Instant.now(), Instant.now(), Instant.now(), Instant.now(), null, ArrayList(), "sev-2", ArrayList(), null ) @@ -30,7 +30,7 @@ class AcknowledgeAlertResponseTests : OpenSearchTestCase() { val failed = mutableListOf( Alert( "1234", 0L, 1, "monitor-1234", "test-monitor", 0L, randomUser(), - "trigger-14", "test-trigger", Alert.State.ERROR, Instant.now(), Instant.now(), + "trigger-14", "test-trigger", ArrayList(), ArrayList(), Alert.State.ERROR, Instant.now(), Instant.now(), Instant.now(), Instant.now(), null, mutableListOf(AlertError(Instant.now(), "Error msg")), "sev-2", mutableListOf(ActionExecutionResult("7890", null, 0)), null ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt index d4280daaa..8713e0065 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt @@ -42,6 +42,8 @@ class GetAlertsResponseTests : OpenSearchTestCase() { randomUser(), "triggerId", "triggerName", + Collections.emptyList(), + Collections.emptyList(), Alert.State.ACKNOWLEDGED, Instant.MIN, null, @@ -78,6 +80,8 @@ class GetAlertsResponseTests : OpenSearchTestCase() { null, "triggerId", "triggerName", + Collections.emptyList(), + Collections.emptyList(), Alert.State.ACKNOWLEDGED, now, null, @@ -93,8 +97,8 @@ class GetAlertsResponseTests : OpenSearchTestCase() { var actualXContentString = req.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() val expectedXContentString = "{\"alerts\":[{\"id\":\"id\",\"version\":0,\"monitor_id\":\"monitorId\"," + "\"schema_version\":0,\"monitor_version\":0,\"monitor_name\":\"monitorName\"," + - "\"trigger_id\":\"triggerId\"," + - "\"trigger_name\":\"triggerName\",\"state\":\"ACKNOWLEDGED\",\"error_message\":null,\"alert_history\":[]," + + "\"trigger_id\":\"triggerId\",\"trigger_name\":\"triggerName\"," + + "\"finding_ids\":[],\"related_doc_ids\":[],\"state\":\"ACKNOWLEDGED\",\"error_message\":null,\"alert_history\":[]," + "\"severity\":\"severity\",\"action_execution_results\":[],\"start_time\":" + now.toEpochMilli() + ",\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null}],\"totalAlerts\":1}" assertEquals(expectedXContentString, actualXContentString) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/DocLevelMonitorInputTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/DocLevelMonitorInputTests.kt index 4311dee19..13b2ebb63 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/DocLevelMonitorInputTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/DocLevelMonitorInputTests.kt @@ -5,10 +5,9 @@ package org.opensearch.alerting.model +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.elasticapi.string -import org.opensearch.alerting.model.action.Action -import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput -import org.opensearch.alerting.model.docLevelInput.DocLevelQuery import org.opensearch.alerting.randomDocLevelMonitorInput import org.opensearch.alerting.randomDocLevelQuery import org.opensearch.common.xcontent.ToXContent @@ -29,9 +28,6 @@ class DocLevelMonitorInputTests : OpenSearchTestCase() { assertEquals("Template args 'query' field does not match:", templateArgs[DocLevelQuery.QUERY_FIELD], query.query) assertEquals("Template args 'severity' field does not match:", templateArgs[DocLevelQuery.SEVERITY_FIELD], query.severity) assertEquals("Template args 'tags' field does not match:", templateArgs[DocLevelQuery.TAGS_FIELD], query.tags) - - val expectedActions = query.actions.map { mapOf(Action.NAME_FIELD to it.name) } - assertEquals("Template args 'actions' field does not match:", templateArgs[DocLevelQuery.ACTIONS_FIELD], expectedActions) } fun `testing DocLevelMonitorInput asTemplateArgs`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt index 1ff468af1..dd73908d0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt @@ -25,9 +25,7 @@ class FindingTests : OpenSearchTestCase() { templateArgs[Finding.MONITOR_NAME_FIELD], finding.monitorName ) - assertEquals("Template args 'queryId' field does not match:", templateArgs[Finding.QUERY_ID_FIELD], finding.queryId) - assertEquals("Template args 'queryTags' field does not match:", templateArgs[Finding.QUERY_TAGS_FIELD], finding.queryTags) - assertEquals("Template args 'severity' field does not match:", templateArgs[Finding.SEVERITY_FIELD], finding.severity) + assertEquals("Template args 'queries' field does not match:", templateArgs[Finding.QUERIES_FIELD], finding.docLevelQueries) assertEquals( "Template args 'timestamp' field does not match:", templateArgs[Finding.TIMESTAMP_FIELD], diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index bff48231e..9d2337df4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -9,6 +9,7 @@ import org.apache.http.entity.ContentType import org.apache.http.message.BasicHeader import org.apache.http.nio.entity.NStringEntity import org.opensearch.alerting.ALERTING_BASE_URI +import org.opensearch.alerting.ALWAYS_RUN import org.opensearch.alerting.ANOMALY_DETECTOR_INDEX import org.opensearch.alerting.AlertingRestTestCase import org.opensearch.alerting.DESTINATION_BASE_URI @@ -16,6 +17,8 @@ import org.opensearch.alerting.LEGACY_OPENDISTRO_ALERTING_BASE_URI import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.anomalyDetectorIndexMapping import org.opensearch.alerting.core.model.CronSchedule +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.settings.ScheduledJobSettings @@ -32,7 +35,8 @@ import org.opensearch.alerting.randomAlert import org.opensearch.alerting.randomAnomalyDetector import org.opensearch.alerting.randomAnomalyDetectorWithUser import org.opensearch.alerting.randomBucketLevelTrigger -import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentReturningMonitor +import org.opensearch.alerting.randomDocumentReturningTrigger import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomThrottle @@ -1112,7 +1116,12 @@ class MonitorRestApiIT : AlertingRestTestCase() { @Throws(Exception::class) fun `test creating a document monitor`() { - val monitor = randomDocumentLevelMonitor() + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", severity = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) val createResponse = client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) @@ -1128,7 +1137,14 @@ class MonitorRestApiIT : AlertingRestTestCase() { @Throws(Exception::class) fun `test getting a document level monitor`() { - val monitor = createRandomDocumentMonitor() + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", severity = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger), user = null) + ) val storedMonitor = getMonitor(monitor.id) @@ -1137,7 +1153,13 @@ class MonitorRestApiIT : AlertingRestTestCase() { @Throws(Exception::class) fun `test updating conditions for a doc-level monitor`() { - val monitor = createRandomDocumentMonitor() + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", severity = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + val updatedTriggers = listOf( DocumentLevelTrigger( name = "foo", @@ -1162,7 +1184,12 @@ class MonitorRestApiIT : AlertingRestTestCase() { @Throws(Exception::class) fun `test deleting a document level monitor`() { - val monitor = createRandomDocumentMonitor() + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", severity = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) val deleteResponse = client().makeRequest("DELETE", monitor.relativeUrl()) assertEquals("Delete failed", RestStatus.OK, deleteResponse.restStatus()) @@ -1174,7 +1201,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test creating a document monitor with error trigger`() { val trigger = randomQueryLevelTrigger() try { - val monitor = randomDocumentLevelMonitor(triggers = listOf(trigger)) + val monitor = randomDocumentReturningMonitor(triggers = listOf(trigger)) client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) fail("Monitor with illegal trigger should be rejected.") } catch (e: IllegalArgumentException) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelMonitorInput.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/DocLevelMonitorInput.kt similarity index 97% rename from alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelMonitorInput.kt rename to core/src/main/kotlin/org/opensearch/alerting/core/model/DocLevelMonitorInput.kt index 97d75a079..fbeba6007 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelMonitorInput.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/DocLevelMonitorInput.kt @@ -3,9 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.alerting.model.docLevelInput +package org.opensearch.alerting.core.model -import org.opensearch.alerting.core.model.Input import org.opensearch.common.CheckedFunction import org.opensearch.common.ParseField import org.opensearch.common.io.stream.StreamInput diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelQuery.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/DocLevelQuery.kt similarity index 74% rename from alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelQuery.kt rename to core/src/main/kotlin/org/opensearch/alerting/core/model/DocLevelQuery.kt index f1d0cf367..8a4c235a4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelQuery.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/DocLevelQuery.kt @@ -3,9 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.alerting.model.docLevelInput +package org.opensearch.alerting.core.model -import org.opensearch.alerting.model.action.Action import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable @@ -20,8 +19,7 @@ data class DocLevelQuery( val id: String = NO_ID, val query: String, val severity: String, - val tags: List = mutableListOf(), - val actions: List = mutableListOf() + val tags: List = mutableListOf() ) : Writeable, ToXContentObject { @Throws(IOException::class) @@ -29,8 +27,7 @@ data class DocLevelQuery( sin.readString(), // id sin.readString(), // query sin.readString(), // severity - sin.readStringList(), // tags - sin.readList(::Action) // actions + sin.readStringList() // tags ) fun asTemplateArg(): Map { @@ -38,8 +35,7 @@ data class DocLevelQuery( QUERY_ID_FIELD to id, QUERY_FIELD to query, SEVERITY_FIELD to severity, - TAGS_FIELD to tags, - ACTIONS_FIELD to actions.map { it.asTemplateArg() } + TAGS_FIELD to tags ) } @@ -49,7 +45,6 @@ data class DocLevelQuery( out.writeString(query) out.writeString(severity) out.writeStringCollection(tags) - out.writeCollection(actions) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -58,7 +53,6 @@ data class DocLevelQuery( .field(QUERY_FIELD, query) .field(SEVERITY_FIELD, severity) .field(TAGS_FIELD, tags.toTypedArray()) - .field(ACTIONS_FIELD, actions.toTypedArray()) .endObject() return builder } @@ -68,7 +62,6 @@ data class DocLevelQuery( const val QUERY_FIELD = "query" const val SEVERITY_FIELD = "severity" const val TAGS_FIELD = "tags" - const val ACTIONS_FIELD = "actions" const val NO_ID = "" @@ -78,7 +71,6 @@ data class DocLevelQuery( lateinit var query: String lateinit var severity: String val tags: MutableList = mutableListOf() - val actions: MutableList = mutableListOf() ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -95,12 +87,6 @@ data class DocLevelQuery( tags.add(xcp.text()) } } - ACTIONS_FIELD -> { - ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) - while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { - actions.add(Action.parse(xcp)) - } - } } } @@ -108,14 +94,13 @@ data class DocLevelQuery( id = id, query = query, severity = severity, - tags = tags, - actions = actions + tags = tags ) } @JvmStatic @Throws(IOException::class) - fun readFrom(sin: StreamInput): DocLevelMonitorInput { - return DocLevelMonitorInput(sin) + fun readFrom(sin: StreamInput): DocLevelQuery { + return DocLevelQuery(sin) } } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/Input.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/Input.kt index f7700e05f..ad41ed91a 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/Input.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/Input.kt @@ -5,6 +5,9 @@ package org.opensearch.alerting.core.model +import org.opensearch.alerting.core.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD +import org.opensearch.alerting.core.model.SearchInput.Companion.SEARCH_FIELD +import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.Writeable import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentParser @@ -13,6 +16,16 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import java.io.IOException interface Input : Writeable, ToXContentObject { + + enum class Type(val value: String) { + DOCUMENT_LEVEL_INPUT(DOC_LEVEL_INPUT_FIELD), + SEARCH_INPUT(SEARCH_FIELD); + + override fun toString(): String { + return value + } + } + companion object { @Throws(IOException::class) @@ -20,10 +33,26 @@ interface Input : Writeable, ToXContentObject { ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) ensureExpectedToken(Token.FIELD_NAME, xcp.nextToken(), xcp) ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) - val input = xcp.namedObject(Input::class.java, xcp.currentName(), null) + val input = if (xcp.currentName() == Type.SEARCH_INPUT.value) { + SearchInput.parseInner(xcp) + } else { + DocLevelMonitorInput.parse(xcp) + } ensureExpectedToken(Token.END_OBJECT, xcp.nextToken(), xcp) return input } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Input { + return when (val type = sin.readEnum(Input.Type::class.java)) { + Type.DOCUMENT_LEVEL_INPUT -> DocLevelMonitorInput(sin) + Type.SEARCH_INPUT -> SearchInput(sin) + // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns + // enum can be null in Java + else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") + } + } } fun name(): String diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/SearchInput.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/SearchInput.kt index 1688cc540..6e2d075eb 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/SearchInput.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/SearchInput.kt @@ -53,7 +53,7 @@ data class SearchInput(val indices: List, val query: SearchSourceBuilder val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField("search"), CheckedFunction { parseInner(it) }) @JvmStatic @Throws(IOException::class) - private fun parseInner(xcp: XContentParser): SearchInput { + fun parseInner(xcp: XContentParser): SearchInput { val indices = mutableListOf() lateinit var searchSourceBuilder: SearchSourceBuilder