From 28f401d29fb5da10981789f317aa5f4254c7bc06 Mon Sep 17 00:00:00 2001 From: Mohammad Qureshi <47198598+qreshi@users.noreply.github.com> Date: Fri, 27 Aug 2021 14:39:55 -0700 Subject: [PATCH] Update MonitorRunner for Bucket-Level Alerting (#155) * Update MonitorRunner for Bucket-Level Alerting Signed-off-by: Mohammad Qureshi * Update regressed comment in MonitorRunnerIT Signed-off-by: Mohammad Qureshi * Add TODO to break down runBucketLevelMonitor method in MonitorRunner Signed-off-by: Mohammad Qureshi --- alerting/build.gradle | 5 + .../org/opensearch/alerting/MonitorRunner.kt | 364 ++++++++++++++---- .../TransportExecuteMonitorAction.kt | 7 +- .../opensearch/alerting/util/AlertingUtils.kt | 17 + .../opensearch/alerting/MonitorRunnerIT.kt | 344 +++++++++++++++-- .../org/opensearch/alerting/TestHelpers.kt | 107 +++-- ...ucketSelectorExtAggregationBuilderTests.kt | 10 +- .../BucketSelectorExtAggregatorTests.kt | 43 ++- 8 files changed, 741 insertions(+), 156 deletions(-) diff --git a/alerting/build.gradle b/alerting/build.gradle index 6e43940d5..1f31293c0 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -69,6 +69,10 @@ configurations.all { } } +configurations.testCompile { + exclude module: "securemock" +} + dependencies { compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}" @@ -82,6 +86,7 @@ dependencies { implementation "com.github.seancfoley:ipaddress:5.3.3" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" + testCompile "org.mockito:mockito-core:2.23.0" } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 38a9ca988..b1957e0ee 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -34,47 +34,33 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager -import org.opensearch.ExceptionsHelper -import org.opensearch.action.DocWriteRequest import org.opensearch.action.bulk.BackoffPolicy -import org.opensearch.action.bulk.BulkRequest -import org.opensearch.action.bulk.BulkResponse -import org.opensearch.action.delete.DeleteRequest -import org.opensearch.action.index.IndexRequest -import org.opensearch.action.search.SearchRequest -import org.opensearch.action.search.SearchResponse -import org.opensearch.alerting.alerts.AlertError import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.moveAlerts import org.opensearch.alerting.core.JobRunner import org.opensearch.alerting.core.model.ScheduledJob -import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.elasticapi.InjectorContextElement -import org.opensearch.alerting.elasticapi.convertToMap -import org.opensearch.alerting.elasticapi.firstFailureOrNull import org.opensearch.alerting.elasticapi.retry -import org.opensearch.alerting.elasticapi.suspendUntil -import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.Alert -import org.opensearch.alerting.model.Alert.State.ACKNOWLEDGED -import org.opensearch.alerting.model.Alert.State.ACTIVE -import org.opensearch.alerting.model.Alert.State.COMPLETED -import org.opensearch.alerting.model.Alert.State.DELETED -import org.opensearch.alerting.model.Alert.State.ERROR import org.opensearch.alerting.model.AlertingConfigAccessor -import org.opensearch.alerting.model.InputRunResults +import org.opensearch.alerting.model.BucketLevelTrigger +import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.MonitorRunResult -import org.opensearch.alerting.model.Trigger -import org.opensearch.alerting.model.TriggerRunResult +import org.opensearch.alerting.model.QueryLevelTrigger +import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.model.action.Action import org.opensearch.alerting.model.action.Action.Companion.MESSAGE import org.opensearch.alerting.model.action.Action.Companion.MESSAGE_ID import org.opensearch.alerting.model.action.Action.Companion.SUBJECT +import org.opensearch.alerting.model.action.ActionExecutionScope +import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.model.action.PerAlertActionScope import org.opensearch.alerting.model.destination.DestinationContextFactory +import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerExecutionContext -import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT @@ -85,31 +71,21 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings.Companion.HOST_DENY_LIST_NONE -import org.opensearch.alerting.util.IndexUtils -import org.opensearch.alerting.util.addUserBackendRolesFilter +import org.opensearch.alerting.util.getActionScope +import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.alerting.util.getCombinedTriggerRunResult import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.util.isAllowed +import org.opensearch.alerting.util.isBucketLevelMonitor import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.Strings -import org.opensearch.common.bytes.BytesReference import org.opensearch.common.component.AbstractLifecycleComponent import org.opensearch.common.settings.Settings -import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.ToXContent -import org.opensearch.common.xcontent.XContentFactory -import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.common.xcontent.XContentParser -import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.common.xcontent.XContentType -import org.opensearch.index.query.QueryBuilders -import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptService -import org.opensearch.script.ScriptType import org.opensearch.script.TemplateScript -import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.threadpool.ThreadPool import java.time.Instant import kotlin.coroutines.CoroutineContext @@ -195,14 +171,14 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { // Must be called after registerClusterService and registerSettings in AlertingPlugin fun registerConsumers(): MonitorRunner { retryPolicy = BackoffPolicy.constantBackoff(ALERT_BACKOFF_MILLIS.get(settings), ALERT_BACKOFF_COUNT.get(settings)) - clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { - millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count) + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count -> + retryPolicy = BackoffPolicy.constantBackoff(millis, count) } moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(MOVE_ALERTS_BACKOFF_MILLIS.get(settings), MOVE_ALERTS_BACKOFF_COUNT.get(settings)) - clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) { - millis, count -> moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) + clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) { millis, count -> + moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) } allowList = ALLOW_LIST.get(settings) @@ -278,35 +254,29 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { throw IllegalArgumentException("Invalid job type") } - launch { runMonitor(job, periodStart, periodEnd) } + launch { + if (job.isBucketLevelMonitor()) { + runBucketLevelMonitor(job, periodStart, periodEnd) + } else { + runQueryLevelMonitor(job, periodStart, periodEnd) + } + } } - suspend fun runMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false): MonitorRunResult { - /* - * We need to handle 3 cases: - * 1. Monitors created by older versions and never updated. These monitors wont have User details in the - * monitor object. `monitor.user` will be null. Insert `all_access, AmazonES_all_access` role. - * 2. Monitors are created when security plugin is disabled, these will have empty User object. - * (`monitor.user.name`, `monitor.user.roles` are empty ) - * 3. Monitors are created when security plugin is enabled, these will have an User object. - */ - val roles = if (monitor.user == null) { - // fixme: discuss and remove hardcoded to settings? - settings.getAsList("", listOf("all_access", "AmazonES_all_access")) - } else { - monitor.user.roles - } + suspend fun runQueryLevelMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false): + MonitorRunResult { + val roles = getRolesForMonitor(monitor) logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") if (periodStart == periodEnd) { logger.warn("Start and end time are the same: $periodStart. This monitor will probably only run once.") } - var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) val currentAlerts = try { alertIndices.createOrUpdateAlertIndex() alertIndices.createOrUpdateInitialHistoryIndex() - alertService.loadCurrentAlerts(monitor) + alertService.loadCurrentAlertsForQueryLevelMonitor(monitor) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id @@ -322,22 +292,24 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { } val updatedAlerts = mutableListOf() - val triggerResults = mutableMapOf() + val triggerResults = mutableMapOf() for (trigger in monitor.triggers) { val currentAlert = currentAlerts[trigger] - val triggerCtx = TriggerExecutionContext(monitor, trigger, monitorResult, currentAlert) - val triggerResult = triggerService.runTrigger(monitor, trigger, triggerCtx) + val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert) + val triggerResult = triggerService.runQueryLevelTrigger(monitor, trigger, triggerCtx) triggerResults[trigger.id] = triggerResult - if (triggerService.isTriggerActionable(triggerCtx, triggerResult)) { + if (triggerService.isQueryLevelTriggerActionable(triggerCtx, triggerResult)) { val actionCtx = triggerCtx.copy(error = monitorResult.error ?: triggerResult.error) for (action in trigger.actions) { triggerResult.actionResults[action.id] = runAction(action, actionCtx, dryrun) } } - val updatedAlert = alertService.composeAlert(triggerCtx, triggerResult, - monitorResult.alertError() ?: triggerResult.alertError()) + val updatedAlert = alertService.composeQueryLevelAlert( + triggerCtx, triggerResult, + monitorResult.alertError() ?: triggerResult.alertError() + ) if (updatedAlert != null) updatedAlerts += updatedAlert } @@ -348,6 +320,201 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { return monitorResult.copy(triggerResults = triggerResults) } + // TODO: This method has grown very large with all the business logic that has been added. + // Revisit this during refactoring and break it down to be more manageable. + suspend fun runBucketLevelMonitor( + monitor: Monitor, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean = false + ): MonitorRunResult { + val roles = getRolesForMonitor(monitor) + logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") + + if (periodStart == periodEnd) { + logger.warn("Start and end time are the same: $periodStart. This monitor will probably only run once.") + } + + var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + val currentAlerts = try { + alertIndices.createOrUpdateAlertIndex() + alertIndices.createOrUpdateInitialHistoryIndex() + alertService.loadCurrentAlertsForBucketLevelMonitor(monitor) + } catch (e: Exception) { + // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts + val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id + logger.error("Error loading alerts for monitor: $id", e) + return monitorResult.copy(error = e) + } + + /* + * Since the aggregation query can consist of multiple pages, each iteration of the do-while loop only has partial results + * from the runBucketLevelTrigger results whereas the currentAlerts has a complete view of existing Alerts. This means that + * it can be confirmed if an Alert is new or de-duped local to the do-while loop if a key appears or doesn't appear in + * the currentAlerts. However, it cannot be guaranteed that an existing Alert is COMPLETED until all pages have been + * iterated over (since a bucket that did not appear in one page of the aggregation results, could appear in a later page). + * + * To solve for this, the currentAlerts will be acting as a list of "potentially completed alerts" throughout the execution. + * When categorizing the Alerts in each iteration, de-duped Alerts will be removed from the currentAlerts map + * (for the Trigger being executed) and the Alerts left in currentAlerts after all pages have been iterated through can + * be marked as COMPLETED since they were never de-duped. + * + * Meanwhile, the nextAlerts map will contain Alerts that will exist at the end of this Monitor execution. It is a compilation + * across Triggers because in the case of executing actions at a PER_EXECUTION frequency, all the Alerts are needed before executing + * Actions which can only be done once all of the aggregation results (and Triggers given the pagination logic) have been evaluated. + */ + val triggerResults = mutableMapOf() + val triggerContexts = mutableMapOf() + val nextAlerts = mutableMapOf>>() + do { + // TODO: Since a composite aggregation is being used for the input query, the total bucket count cannot be determined. + // If a setting is imposed that limits buckets that can be processed for Bucket-Level Monitors, we'd need to iterate over + // the buckets until we hit that threshold. In that case, we'd want to exit the execution without creating any alerts since the + // buckets we iterate over before hitting the limit is not deterministic. Is there a better way to fail faster in this case? + runBlocking(InjectorContextElement(monitor.id, settings, threadPool.threadContext, roles)) { + monitorResult = monitorResult.copy( + inputResults = inputService.collectInputResults(monitor, periodStart, periodEnd, monitorResult.inputResults) + ) + } + + for (trigger in monitor.triggers) { + // The currentAlerts map is formed by iterating over the Monitor's Triggers as keys so null should not be returned here + val currentAlertsForTrigger = currentAlerts[trigger]!! + val triggerCtx = BucketLevelTriggerExecutionContext(monitor, trigger as BucketLevelTrigger, monitorResult) + triggerContexts[trigger.id] = triggerCtx + val triggerResult = triggerService.runBucketLevelTrigger(monitor, trigger, triggerCtx) + triggerResults[trigger.id] = triggerResult.getCombinedTriggerRunResult(triggerResults[trigger.id]) + + // TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can + // be refactored to use a map instead + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor( + monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList() + ).toMutableMap() + val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList()) + var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList()) + + /* + * Index de-duped and new Alerts here so they are available at the time the Actions are executed. + * + * The new Alerts have to be returned and saved back with their indexed doc ID to prevent duplicate documents + * when the Alerts are updated again after Action execution. + * + * Note: Index operations can fail for various reasons (such as write blocks on cluster), in such a case, the Actions + * will still execute with the Alert information in the ctx but the Alerts may not be visible. + */ + alertService.saveAlerts(dedupedAlerts, retryPolicy) + newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy) + + // Store deduped and new Alerts to accumulate across pages + if (!nextAlerts.containsKey(trigger.id)) { + nextAlerts[trigger.id] = mutableMapOf( + AlertCategory.DEDUPED to mutableListOf(), + AlertCategory.NEW to mutableListOf(), + AlertCategory.COMPLETED to mutableListOf() + ) + } + nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)?.addAll(dedupedAlerts) + nextAlerts[trigger.id]?.get(AlertCategory.NEW)?.addAll(newAlerts) + } + } while (monitorResult.inputResults.afterKeysPresent()) + + // The completed Alerts are whatever are left in the currentAlerts + currentAlerts.forEach { (trigger, keysToAlertsMap) -> + nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap)) + } + + for (trigger in monitor.triggers) { + val alertsToUpdate = mutableSetOf() + val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED) ?: mutableListOf() + val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf() + val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf() + + // All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop + val triggerCtx = triggerContexts[trigger.id]!! + val triggerResult = triggerResults[trigger.id]!! + for (action in trigger.actions) { + if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT) { + val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope + for (alertCategory in perAlertActionFrequency.actionableAlerts) { + val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() + for (alert in alertsToExecuteActionsFor) { + if (isBucketLevelTriggerActionThrottled(action, alert)) continue + + val actionCtx = getActionContextForAlertCategory( + alertCategory, alert, triggerCtx, monitorResult.error ?: triggerResult.error + ) + // AggregationResultBucket should not be null here + val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash() + if (!triggerResult.actionResultsMap.containsKey(alertBucketKeysHash)) { + triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf() + } + + val actionResult = runAction(action, actionCtx, dryrun) + triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult) + alertsToUpdate.add(alert) + } + } + } else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION) { + // If all categories of Alerts are empty, there is nothing to message on and we can skip the Action + if (dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue + + val actionCtx = triggerCtx.copy( + dedupedAlerts = dedupedAlerts, + newAlerts = newAlerts, + completedAlerts = completedAlerts, + error = monitorResult.error ?: triggerResult.error + ) + val actionResult = runAction(action, actionCtx, dryrun) + // Save the Action run result for every Alert + for (alert in (dedupedAlerts + newAlerts + completedAlerts)) { + val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash() + if (!triggerResult.actionResultsMap.containsKey(alertBucketKeysHash)) { + triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf() + } + triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult) + alertsToUpdate.add(alert) + } + } + } + + // Alerts are only added to alertsToUpdate after Action execution meaning the action results for it should be present + // in the actionResultsMap but returning a default value when accessing the map to be safe. + val updatedAlerts = alertsToUpdate.map { alert -> + val bucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash() + val actionResults = triggerResult.actionResultsMap.getOrDefault(bucketKeysHash, emptyMap()) + alertService.updateActionResultsForBucketLevelAlert( + alert, + actionResults, + // TODO: Update BucketLevelTriggerRunResult.alertError() to retrieve error based on the first failed Action + monitorResult.alertError() ?: triggerResult.alertError() + ) + } + + // Update Alerts with action execution results + alertService.saveAlerts(updatedAlerts, retryPolicy) + } + + return monitorResult.copy(triggerResults = triggerResults) + } + + private fun getRolesForMonitor(monitor: Monitor): List { + /* + * We need to handle 3 cases: + * 1. Monitors created by older versions and never updated. These monitors wont have User details in the + * monitor object. `monitor.user` will be null. Insert `all_access, AmazonES_all_access` role. + * 2. Monitors are created when security plugin is disabled, these will have empty User object. + * (`monitor.user.name`, `monitor.user.roles` are empty ) + * 3. Monitors are created when security plugin is enabled, these will have an User object. + */ + return if (monitor.user == null) { + // fixme: discuss and remove hardcoded to settings? + // TODO: Remove "AmazonES_all_access" role? + settings.getAsList("", listOf("all_access", "AmazonES_all_access")) + } else { + monitor.user.roles + } + } + // TODO: Can this be updated to just use 'Instant.now()'? // 'threadPool.absoluteTimeInMillis()' is referring to a cached value of System.currentTimeMillis() that by default updates every 200ms private fun currentTime() = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) @@ -365,7 +532,40 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { return true } - private suspend fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult { + // TODO: Add unit test for this method (or at least cover it in MonitorRunnerIT) + // Bucket-Level Monitors use the throttle configurations defined in ActionExecutionPolicy, this method evaluates that configuration. + private fun isBucketLevelTriggerActionThrottled(action: Action, alert: Alert): Boolean { + if (action.actionExecutionPolicy.throttle == null) return false + // TODO: This will need to be updated if throttleEnabled is moved to ActionExecutionPolicy + if (action.throttleEnabled) { + val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id } + val lastExecutionTime: Instant? = result?.lastExecutionTime + val throttledTimeBound = currentTime().minus( + action.actionExecutionPolicy.throttle.value.toLong(), + action.actionExecutionPolicy.throttle.unit + ) + return !(lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound)) + } + return false + } + + private fun getActionContextForAlertCategory( + alertCategory: AlertCategory, + alert: Alert, + ctx: BucketLevelTriggerExecutionContext, + error: Exception? + ): BucketLevelTriggerExecutionContext { + return when (alertCategory) { + AlertCategory.DEDUPED -> + ctx.copy(dedupedAlerts = listOf(alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error) + AlertCategory.NEW -> + ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alert), completedAlerts = emptyList(), error = error) + AlertCategory.COMPLETED -> + ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alert), error = error) + } + } + + private suspend fun runAction(action: Action, ctx: QueryLevelTriggerExecutionContext, dryrun: Boolean): ActionRunResult { return try { if (!isActionActionable(action, ctx.alert)) { return ActionRunResult(action.id, action.name, mapOf(), true, null, null) @@ -398,6 +598,38 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { } } + // TODO: This is largely a duplicate of runAction above for BucketLevelTriggerExecutionContext for now. + // After suppression logic implementation, if this remains mostly the same, it can be refactored. + private suspend fun runAction(action: Action, ctx: BucketLevelTriggerExecutionContext, dryrun: Boolean): ActionRunResult { + return try { + val actionOutput = mutableMapOf() + actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else "" + actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx) + if (Strings.isNullOrEmpty(actionOutput[MESSAGE])) { + throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") + } + if (!dryrun) { + withContext(Dispatchers.IO) { + val destination = AlertingConfigAccessor.getDestinationInfo(client, xContentRegistry, action.destinationId) + if (!destination.isAllowed(allowList)) { + throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}") + } + + val destinationCtx = destinationContextFactory.getDestinationContext(destination) + actionOutput[MESSAGE_ID] = destination.publish( + actionOutput[SUBJECT], + actionOutput[MESSAGE]!!, + destinationCtx, + hostDenyList + ) + } + } + ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null) + } catch (e: Exception) { + ActionRunResult(action.id, action.name, mapOf(), false, currentTime(), e) + } + } + private fun compileTemplate(template: Script, ctx: TriggerExecutionContext): String { return scriptService.compile(template, TemplateScript.CONTEXT) .newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg())) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 7243b033c..799b33eac 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.action.ExecuteMonitorResponse import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.isBucketLevelMonitor import org.opensearch.client.Client import org.opensearch.common.inject.Inject import org.opensearch.common.xcontent.LoggingDeprecationHandler @@ -68,7 +69,11 @@ class TransportExecuteMonitorAction @Inject constructor( val (periodStart, periodEnd) = monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis)) try { - val monitorRunResult = runner.runMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun) + val monitorRunResult = if (monitor.isBucketLevelMonitor()) { + runner.runBucketLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun) + } else { + runner.runQueryLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun) + } withContext(Dispatchers.IO) { actionListener.onResponse(ExecuteMonitorResponse(monitorRunResult)) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 3563cc2bc..12fe034e7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -31,6 +31,7 @@ import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.alerting.destination.message.BaseMessage import org.opensearch.alerting.model.AggregationResultBucket +import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.action.Action import org.opensearch.alerting.model.action.ActionExecutionScope @@ -154,3 +155,19 @@ fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinTo fun Action.getActionScope(): ActionExecutionScope.Type = this.actionExecutionPolicy.actionExecutionScope.getExecutionScope() + +fun BucketLevelTriggerRunResult.getCombinedTriggerRunResult( + prevTriggerRunResult: BucketLevelTriggerRunResult? +): BucketLevelTriggerRunResult { + if (prevTriggerRunResult == null) return this + + // The aggregation results and action results across to two trigger run results should not have overlapping keys + // since they represent different pages of aggregations so a simple concatenation will combine them + val mergedAggregationResultBuckets = prevTriggerRunResult.aggregationResultBuckets + this.aggregationResultBuckets + val mergedActionResultsMap = (prevTriggerRunResult.actionResultsMap + this.actionResultsMap).toMutableMap() + + // Update to the most recent error if it's not null, otherwise keep the old one + val error = this.error ?: prevTriggerRunResult.error + + return this.copy(aggregationResultBuckets = mergedAggregationResultBuckets, actionResultsMap = mergedActionResultsMap, error = error) +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index 79ccb09cf..9a62cf30f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -27,6 +27,7 @@ package org.opensearch.alerting import org.junit.Assert +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.alerts.AlertError import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.model.IntervalSchedule @@ -38,6 +39,8 @@ import org.opensearch.alerting.model.Alert.State.ACTIVE import org.opensearch.alerting.model.Alert.State.COMPLETED import org.opensearch.alerting.model.Alert.State.ERROR import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.action.ActionExecutionPolicy +import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.CustomWebhook import org.opensearch.alerting.model.destination.Destination @@ -51,6 +54,8 @@ import org.opensearch.commons.authuser.User import org.opensearch.index.query.QueryBuilders import org.opensearch.rest.RestStatus import org.opensearch.script.Script +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder import org.opensearch.search.builder.SearchSourceBuilder import java.net.URLEncoder import java.time.Instant @@ -66,7 +71,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor with dryrun`() { val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) - val monitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))) + val monitor = randomQueryLevelMonitor( + triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) val response = executeMonitor(monitor, params = DRYRUN_MONITOR) @@ -132,7 +139,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test active alert is updated on each run`() { val monitor = createMonitor( - randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id))) + randomQueryLevelMonitor( + triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id)) + ) ) executeMonitor(monitor.id) @@ -231,7 +240,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test acknowledged alert is not updated unnecessarily`() { val monitor = createMonitor( - randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id))) + randomQueryLevelMonitor( + triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id)) + ) ) executeMonitor(monitor.id) acknowledgeAlerts(monitor, searchAlerts(monitor).single()) @@ -286,7 +297,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute action template error`() { // Intentional syntax error in mustache template val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name")) - val monitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))) + val monitor = randomQueryLevelMonitor( + triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) val response = executeMonitor(monitor) @@ -413,14 +426,19 @@ class MonitorRunnerIT : AlertingRestTestCase() { } fun `test monitor with one bad action and one good action`() { - val goodAction = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val goodAction = randomAction( + template = randomTemplateScript("Hello {{ctx.monitor.name}}"), + destinationId = createDestination().id + ) val syntaxErrorAction = randomAction( name = "bad syntax", template = randomTemplateScript("{{foo"), destinationId = createDestination().id ) val actions = listOf(goodAction, syntaxErrorAction) - val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = actions)))) + val monitor = createMonitor( + randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = actions))) + ) val output = entityAsMap(executeMonitor(monitor.id)) @@ -612,7 +630,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { putAlertMappings() val newMonitor = createMonitor( - randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = NEVER_RUN, actions = listOf(randomAction())))) + randomQueryLevelMonitor( + triggers = listOf(randomQueryLevelTrigger(condition = NEVER_RUN, actions = listOf(randomAction()))) + ) ) val deleteNewMonitorResponse = client().makeRequest("DELETE", "$ALERTING_BASE_URI/${newMonitor.id}") @@ -751,7 +771,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) } - fun `test execute monitor with email destination creates alerts in error state`() { + fun `test execute monitor with email destination creates alert in error state`() { putAlertMappings() // Required as we do not have a create alert API. val emailAccount = createRandomEmailAccount() @@ -815,33 +835,30 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor with custom webhook destination and denied host`() { - // TODO: change to REST API call to test security enabled case - if (!securityEnabled()) { - listOf("http://10.1.1.1", "127.0.0.1").forEach { - val customWebhook = CustomWebhook(it, null, null, 80, null, "PUT", emptyMap(), emptyMap(), null, null) - val destination = createDestination( - Destination( - type = DestinationType.CUSTOM_WEBHOOK, - name = "testDesination", - user = randomUser(), - lastUpdateTime = Instant.now(), - chime = null, - slack = null, - customWebhook = customWebhook, - email = null - ) + listOf("http://10.1.1.1", "127.0.0.1").forEach { + val customWebhook = CustomWebhook(it, null, null, 80, null, "PUT", emptyMap(), emptyMap(), null, null) + val destination = createDestination( + Destination( + type = DestinationType.CUSTOM_WEBHOOK, + name = "testDesination", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = null, + slack = null, + customWebhook = customWebhook, + email = null ) - val action = randomAction(destinationId = destination.id) - val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)) - val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) - executeMonitor(adminClient(), monitor.id) + ) + val action = randomAction(destinationId = destination.id) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) + executeMonitor(adminClient(), monitor.id) - val alerts = searchAlerts(monitor) - assertEquals("Alert not saved", 1, alerts.size) - verifyAlert(alerts.single(), monitor, ERROR) + val alerts = searchAlerts(monitor) + assertEquals("Alert not saved", 1, alerts.size) + verifyAlert(alerts.single(), monitor, ERROR) - Assert.assertTrue(alerts.single().errorMessage?.contains("The destination address is invalid") as Boolean) - } + Assert.assertTrue(alerts.single().errorMessage?.contains("The destination address is invalid") as Boolean) } } @@ -954,6 +971,269 @@ class MonitorRunnerIT : AlertingRestTestCase() { } } + // TODO: The composite aggregation will paginate through all results during the Bucket-Level Monitor run. + // The last page (when after_key is null) is empty if all the contents fit on the previous page, meaning the + // input results returned by the monitor execution is empty. + // Skipping this test for now until this is resolved to show a non-empty result. + fun `skip test execute bucket-level monitor returns search result`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + // print("Output is: $output") + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + assertEquals("Incorrect search result", 2, buckets.size) + } + + fun `test bucket-level monitor alert creation and completion`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + executeMonitor(monitor.id, params = DRYRUN_MONITOR) + + // Check created alerts + var alerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, alerts.size) + alerts.forEach { + verifyAlert(it, monitor, ACTIVE) + } + + // Delete documents of a particular value + deleteDataWithDocIds( + testIndex, + listOf( + "1", // test_value_1 + "2" // test_value_1 + ) + ) + + // Execute monitor again + executeMonitor(monitor.id, params = DRYRUN_MONITOR) + + // Verify expected alert was completed + alerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) + val activeAlerts = alerts.filter { it.state == ACTIVE } + val completedAlerts = alerts.filter { it.state == COMPLETED } + assertEquals("Incorrect number of active alerts", 1, activeAlerts.size) + assertEquals("Incorrect number of completed alerts", 1, completedAlerts.size) + } + + @Suppress("UNCHECKED_CAST") + fun `test bucket-level monitor with one good action and one bad action`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_1", + "test_value_3", + "test_value_2", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Trigger script should only create Alerts for 'test_value_1' and 'test_value_2' + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + val goodAction = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val syntaxErrorAction = randomAction( + name = "bad syntax", + template = randomTemplateScript("{{foo"), + destinationId = createDestination().id + ) + val actions = listOf(goodAction, syntaxErrorAction) + + var trigger = randomBucketLevelTrigger(actions = actions) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val output = entityAsMap(executeMonitor(monitor.id)) + // The 'events' in this case are the bucketKeys hashes representing the Alert events + val expectedEvents = setOf("test_value_1", "test_value_2") + + assertEquals(monitor.name, output["monitor_name"]) + for (triggerResult in output.objectMap("trigger_results").values) { + for (alertEvent in triggerResult.objectMap("action_results")) { + assertTrue(expectedEvents.contains(alertEvent.key)) + val actionResults = alertEvent.value.values as Collection> + for (actionResult in actionResults) { + val actionOutput = actionResult["output"] as Map + if (actionResult["name"] == goodAction.name) { + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } else if (actionResult["name"] == syntaxErrorAction.name) { + assertTrue("Missing action error message", (actionResult["error"] as String).isNotEmpty()) + } else { + fail("Unknown action: ${actionResult["name"]}") + } + } + } + } + + // Check created alerts + val alerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, alerts.size) + alerts.forEach { + verifyAlert(it, monitor, ACTIVE) + } + } + + @Suppress("UNCHECKED_CAST") + fun `test bucket-level monitor with per execution action frequency`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_1", + "test_value_3", + "test_value_2", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Trigger script should only create Alerts for 'test_value_1' and 'test_value_2' + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + val action = randomAction( + template = randomTemplateScript("Hello {{ctx.monitor.name}}"), + destinationId = createDestination().id, + actionExecutionPolicy = ActionExecutionPolicy(null, PerExecutionActionScope()) + ) + var trigger = randomBucketLevelTrigger(actions = listOf(action)) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val output = entityAsMap(executeMonitor(monitor.id)) + // The 'events' in this case are the bucketKeys hashes representing the Alert events + val expectedEvents = setOf("test_value_1", "test_value_2") + + assertEquals(monitor.name, output["monitor_name"]) + for (triggerResult in output.objectMap("trigger_results").values) { + for (alertEvent in triggerResult.objectMap("action_results")) { + assertTrue(expectedEvents.contains(alertEvent.key)) + val actionResults = alertEvent.value.values as Collection> + for (actionResult in actionResults) { + val actionOutput = actionResult["output"] as Map + assertEquals("Unknown action: ${actionResult["name"]}", action.name, actionResult["name"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + // Check created alerts + val alerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, alerts.size) + alerts.forEach { + verifyAlert(it, monitor, ACTIVE) + } + } + private fun prepareTestAnomalyResult(detectorId: String, user: User) { val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17" try { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 09756e1e2..b3fe3455a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -25,6 +25,8 @@ */ package org.opensearch.alerting +import org.apache.http.Header +import org.apache.http.HttpEntity import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter import org.opensearch.alerting.core.model.Input @@ -35,9 +37,9 @@ import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.AggregationResultBucket +import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.BucketLevelTriggerRunResult -import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.MonitorRunResult @@ -45,8 +47,8 @@ import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.model.Trigger import org.opensearch.alerting.model.action.Action -import org.opensearch.alerting.model.action.ActionExecutionScope import org.opensearch.alerting.model.action.ActionExecutionPolicy +import org.opensearch.alerting.model.action.ActionExecutionScope import org.opensearch.alerting.model.action.AlertCategory import org.opensearch.alerting.model.action.PerAlertActionScope import org.opensearch.alerting.model.action.PerExecutionActionScope @@ -55,9 +57,6 @@ import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailEntry import org.opensearch.alerting.model.destination.email.EmailGroup import org.opensearch.alerting.util.getBucketKeysHash -import org.opensearch.commons.authuser.User -import org.apache.http.Header -import org.apache.http.HttpEntity import org.opensearch.client.Request import org.opensearch.client.RequestOptions import org.opensearch.client.Response @@ -72,6 +71,7 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.authuser.User import org.opensearch.index.query.QueryBuilders import org.opensearch.script.Script import org.opensearch.script.ScriptType @@ -97,9 +97,11 @@ fun randomQueryLevelMonitor( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false ): Monitor { - return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + return Monitor( + name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + ) } // Monitor of older versions without security. @@ -113,9 +115,11 @@ fun randomQueryLevelMonitorWithoutUser( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false ): Monitor { - return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + return Monitor( + name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + ) } fun randomBucketLevelMonitor( @@ -134,9 +138,11 @@ fun randomBucketLevelMonitor( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false ): Monitor { - return Monitor(name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + return Monitor( + name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + ) } fun randomQueryLevelTrigger( @@ -152,7 +158,8 @@ fun randomQueryLevelTrigger( name = name, severity = severity, condition = condition, - actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions) + actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions + ) } fun randomBucketLevelTrigger( @@ -168,7 +175,8 @@ fun randomBucketLevelTrigger( name = name, severity = severity, bucketSelector = bucketSelector, - actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions) + actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions + ) } fun randomBucketSelectorExtAggregationBuilder( @@ -247,13 +255,19 @@ fun randomThrottle( fun randomActionExecutionPolicy( throttle: Throttle = randomThrottle(), actionExecutionScope: ActionExecutionScope = randomActionExecutionFrequency() -) = ActionExecutionPolicy(throttle, actionExecutionScope) +): ActionExecutionPolicy { + return if (actionExecutionScope is PerExecutionActionScope) { + // Return null for throttle when using PerExecutionActionScope since throttling is currently not supported for it + ActionExecutionPolicy(null, actionExecutionScope) + } else { + ActionExecutionPolicy(throttle, actionExecutionScope) + } +} fun randomActionExecutionFrequency(): ActionExecutionScope { return if (randomBoolean()) { val alertCategories = AlertCategory.values() - PerAlertActionScope( - actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()) + PerAlertActionScope(actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()) } else { PerExecutionActionScope() } @@ -262,16 +276,24 @@ fun randomActionExecutionFrequency(): ActionExecutionScope { fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { val trigger = randomQueryLevelTrigger() val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) - return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, - actionExecutionResults = actionExecutionResults) + return Alert( + monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = actionExecutionResults + ) } fun randomAlertWithAggregationResultBucket(monitor: Monitor = randomBucketLevelMonitor()): Alert { val trigger = randomBucketLevelTrigger() val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) - return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, - actionExecutionResults = actionExecutionResults, aggregationResultBucket = AggregationResultBucket("parent_bucket_path_1", - listOf("bucket_key_1"), mapOf("k1" to "val1", "k2" to "val2"))) + return Alert( + monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = actionExecutionResults, + aggregationResultBucket = AggregationResultBucket( + "parent_bucket_path_1", + listOf("bucket_key_1"), + mapOf("k1" to "val1", "k2" to "val2") + ) + ) } fun randomEmailAccountMethod(): EmailAccount.MethodType { @@ -332,17 +354,29 @@ fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult { map.plus(Pair("key1", randomActionRunResult())) map.plus(Pair("key2", randomActionRunResult())) - val aggBucket1 = AggregationResultBucket("parent_bucket_path_1", listOf("bucket_key_1"), - mapOf("k1" to "val1", "k2" to "val2")) - val aggBucket2 = AggregationResultBucket("parent_bucket_path_2", listOf("bucket_key_2"), - mapOf("k1" to "val1", "k2" to "val2")) + val aggBucket1 = AggregationResultBucket( + "parent_bucket_path_1", + listOf("bucket_key_1"), + mapOf("k1" to "val1", "k2" to "val2") + ) + val aggBucket2 = AggregationResultBucket( + "parent_bucket_path_2", + listOf("bucket_key_2"), + mapOf("k1" to "val1", "k2" to "val2") + ) val actionResultsMap: MutableMap> = mutableMapOf() actionResultsMap[aggBucket1.getBucketKeysHash()] = map actionResultsMap[aggBucket2.getBucketKeysHash()] = map - return BucketLevelTriggerRunResult("trigger-name", null, - mapOf(aggBucket1.getBucketKeysHash() to aggBucket1, aggBucket2.getBucketKeysHash() to aggBucket2), actionResultsMap) + return BucketLevelTriggerRunResult( + "trigger-name", null, + mapOf( + aggBucket1.getBucketKeysHash() to aggBucket1, + aggBucket2.getBucketKeysHash() to aggBucket2 + ), + actionResultsMap + ) } fun randomActionRunResult(): ActionRunResult { @@ -361,8 +395,15 @@ fun Monitor.toJsonString(): String { } fun randomUser(): User { - return User(OpenSearchRestTestCase.randomAlphaOfLength(10), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), - OpenSearchRestTestCase.randomAlphaOfLength(10)), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), listOf("test_attr=test")) + return User( + OpenSearchRestTestCase.randomAlphaOfLength(10), + listOf( + OpenSearchRestTestCase.randomAlphaOfLength(10), + OpenSearchRestTestCase.randomAlphaOfLength(10) + ), + listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), + listOf("test_attr=test") + ) } fun randomUserEmpty(): User { @@ -440,7 +481,9 @@ fun parser(xc: String): XContentParser { } fun xContentRegistry(): NamedXContentRegistry { - return NamedXContentRegistry(listOf( - SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY) + - SearchModule(Settings.EMPTY, false, emptyList()).namedXContents) + return NamedXContentRegistry( + listOf( + SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY + ) + SearchModule(Settings.EMPTY, false, emptyList()).namedXContents + ) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt index 8a925ac1d..15c15546c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt @@ -40,11 +40,11 @@ class BucketSelectorExtAggregationBuilderTests : BasePipelineAggregationTestCase params["foo"] = "bar" } val type = randomFrom(*ScriptType.values()) - script = - Script( - type, if (type == ScriptType.STORED) null else - randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG), "script", params - ) + script = Script( + type, + if (type == ScriptType.STORED) null else randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG), + "script", params + ) } val parentBucketPath = randomAlphaOfLengthBetween(3, 20) val filter = BucketSelectorExtFilter(IncludeExclude("foo.*", "bar.*")) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt index a767a6e5c..1a89bcbca 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt @@ -59,10 +59,13 @@ class BucketSelectorExtAggregatorTests : AggregatorTestCase() { val scriptEngine = MockScriptEngine( MockScriptEngine.NAME, - Collections.singletonMap(SCRIPTNAME, + Collections.singletonMap( + SCRIPTNAME, Function, Any> { script: Map -> script[paramName].toString().toDouble() == paramValue - }), emptyMap() + } + ), + emptyMap() ) val engines: Map = Collections.singletonMap(scriptEngine.type, scriptEngine) return ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS) @@ -103,11 +106,11 @@ class BucketSelectorExtAggregatorTests : AggregatorTestCase() { }, Consumer { f: InternalFilters -> assertThat( - (f.buckets[0].aggregations - .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + (f.buckets[0].aggregations.get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], CoreMatchers.equalTo(1) ) - }, fieldType, fieldType1 + }, + fieldType, fieldType1 ) } @@ -163,11 +166,11 @@ class BucketSelectorExtAggregatorTests : AggregatorTestCase() { }, Consumer { f: InternalFilters -> assertThat( - (f.buckets[0].aggregations - .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices.size, + (f.buckets[0].aggregations.get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices.size, CoreMatchers.equalTo(0) ) - }, fieldType, fieldType1 + }, + fieldType, fieldType1 ) testCase( @@ -184,11 +187,11 @@ class BucketSelectorExtAggregatorTests : AggregatorTestCase() { }, Consumer { f: InternalFilters -> assertThat( - (f.buckets[0].aggregations - .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + (f.buckets[0].aggregations.get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], CoreMatchers.equalTo(1) ) - }, fieldType, fieldType1 + }, + fieldType, fieldType1 ) } @@ -227,11 +230,11 @@ class BucketSelectorExtAggregatorTests : AggregatorTestCase() { }, Consumer { f: InternalFilters -> assertThat( - (f.buckets[0].aggregations - .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices.size, + (f.buckets[0].aggregations.get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices.size, CoreMatchers.equalTo(0) ) - }, fieldType, fieldType1 + }, + fieldType, fieldType1 ) } @@ -271,11 +274,11 @@ class BucketSelectorExtAggregatorTests : AggregatorTestCase() { }, Consumer { f: InternalFilters -> assertThat( - (f.buckets[0].aggregations - .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + (f.buckets[0].aggregations.get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], CoreMatchers.equalTo(0) ) - }, fieldType, fieldType1 + }, + fieldType, fieldType1 ) } @@ -319,11 +322,11 @@ class BucketSelectorExtAggregatorTests : AggregatorTestCase() { }, Consumer { f: InternalFilter -> assertThat( - (f.aggregations - .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + (f.aggregations.get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], CoreMatchers.equalTo(1) ) - }, fieldType, fieldType1 + }, + fieldType, fieldType1 ) }