diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 6b1e848e9..197c5892c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -233,16 +233,15 @@ class AlertService( val currentAlert = currentAlerts[aggAlertBucket.getBucketKeysHash()] if (currentAlert != null) { // De-duped Alert - dedupedAlerts.add(currentAlert.copy(lastNotificationTime = currentTime, aggregationResultBucket = aggAlertBucket)) + dedupedAlerts.add(currentAlert.copy(aggregationResultBucket = aggAlertBucket)) // Remove de-duped Alert from currentAlerts since it is no longer a candidate for a potentially completed Alert currentAlerts.remove(aggAlertBucket.getBucketKeysHash()) } else { // New Alert - // TODO: Setting lastNotificationTime is deceiving since the actions haven't run yet, maybe it should be null here val newAlert = Alert( monitor = monitor, trigger = trigger, startTime = currentTime, - lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null, + lastNotificationTime = null, state = Alert.State.ACTIVE, errorMessage = null, errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(), schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket ) @@ -266,7 +265,7 @@ class AlertService( } ?: listOf() } - suspend fun saveAlerts(alerts: List, retryPolicy: BackoffPolicy) { + suspend fun saveAlerts(alerts: List, retryPolicy: BackoffPolicy, allowUpdatingAcknowledgedAlert: Boolean = false) { var requestsToRetry = alerts.flatMap { alert -> // We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts. // In the rare event that a user acknowledges an alert between when it's read and when it's written @@ -281,7 +280,21 @@ class AlertService( .id(if (alert.id != Alert.NO_ID) alert.id else null) ) } - Alert.State.ACKNOWLEDGED, Alert.State.DELETED -> { + Alert.State.ACKNOWLEDGED -> { + // Allow ACKNOWLEDGED Alerts to be updated for Bucket-Level Monitors since de-duped Alerts can be ACKNOWLEDGED + // and updated by the MonitorRunner + if (allowUpdatingAcknowledgedAlert) { + listOf>( + IndexRequest(AlertIndices.ALERT_INDEX) + .routing(alert.monitorId) + .source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .id(if (alert.id != Alert.NO_ID) alert.id else null) + ) + } else { + throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert") + } + } + Alert.State.DELETED -> { throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert") } Alert.State.COMPLETED -> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index b1957e0ee..aae3f6b9d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -402,7 +402,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { * Note: Index operations can fail for various reasons (such as write blocks on cluster), in such a case, the Actions * will still execute with the Alert information in the ctx but the Alerts may not be visible. */ - alertService.saveAlerts(dedupedAlerts, retryPolicy) + alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true) newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy) // Store deduped and new Alerts to accumulate across pages @@ -425,7 +425,11 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { for (trigger in monitor.triggers) { val alertsToUpdate = mutableSetOf() - val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED) ?: mutableListOf() + // Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them. + // New Alerts are ignored since they cannot be acknowledged yet. + val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED) + ?.filterNot { it.state == Alert.State.ACKNOWLEDGED } + ?: mutableListOf() val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf() val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf() @@ -436,7 +440,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT) { val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope for (alertCategory in perAlertActionFrequency.actionableAlerts) { - val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() + var alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() + // Filter out ACKNOWLEDGED Alerts from the deduped Alerts + if (alertCategory == AlertCategory.DEDUPED) { + alertsToExecuteActionsFor = alertsToExecuteActionsFor.filterNot { it.state == Alert.State.ACKNOWLEDGED } + .toMutableList() + } for (alert in alertsToExecuteActionsFor) { if (isBucketLevelTriggerActionThrottled(action, alert)) continue @@ -483,7 +492,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { val bucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash() val actionResults = triggerResult.actionResultsMap.getOrDefault(bucketKeysHash, emptyMap()) alertService.updateActionResultsForBucketLevelAlert( - alert, + alert.copy(lastNotificationTime = currentTime()), actionResults, // TODO: Update BucketLevelTriggerRunResult.alertError() to retrieve error based on the first failed Action monitorResult.alertError() ?: triggerResult.alertError() @@ -491,7 +500,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { } // Update Alerts with action execution results - alertService.saveAlerts(updatedAlerts, retryPolicy) + // ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them + alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false) } return monitorResult.copy(triggerResults = triggerResults) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index 9a62cf30f..22974fd56 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -47,6 +47,7 @@ import org.opensearch.alerting.model.destination.Destination import org.opensearch.alerting.model.destination.email.Email import org.opensearch.alerting.model.destination.email.Recipient import org.opensearch.alerting.util.DestinationType +import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.client.ResponseException import org.opensearch.client.WarningFailureException import org.opensearch.common.settings.Settings @@ -1086,6 +1087,95 @@ class MonitorRunnerIT : AlertingRestTestCase() { assertEquals("Incorrect number of completed alerts", 1, completedAlerts.size) } + fun `test bucket-level monitor with acknowledged alert`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + executeMonitor(monitor.id, params = DRYRUN_MONITOR) + + // Check created Alerts + var currentAlerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, currentAlerts.size) + currentAlerts.forEach { + verifyAlert(it, monitor, ACTIVE) + } + + // Acknowledge one of the Alerts + val alertToAcknowledge = currentAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") } + acknowledgeAlerts(monitor, alertToAcknowledge) + currentAlerts = searchAlerts(monitor) + val acknowledgedAlert = currentAlerts.single { it.state == ACKNOWLEDGED } + val activeAlert = currentAlerts.single { it.state == ACTIVE } + + // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to + // let lastNotificationTime change. W/o this sleep the test can result in a false negative. + Thread.sleep(200) + executeMonitor(monitor.id, params = DRYRUN_MONITOR) + + // Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was + currentAlerts = searchAlerts(monitor) + val currentAcknowledgedAlert = currentAlerts.single { it.state == ACKNOWLEDGED } + val currentActiveAlert = currentAlerts.single { it.state == ACTIVE } + assertEquals("Acknowledged alert was updated", acknowledgedAlert.lastNotificationTime, currentAcknowledgedAlert.lastNotificationTime) + assertTrue("Active alert was not updated", currentActiveAlert.lastNotificationTime!! > activeAlert.lastNotificationTime) + + // Remove data so that both Alerts are moved into completed + deleteDataWithDocIds( + testIndex, + listOf( + "1", // test_value_1 + "2" // test_value_2 + ) + ) + + // Execute Monitor and check that both Alerts were updated + Thread.sleep(200) + executeMonitor(monitor.id, params = DRYRUN_MONITOR) + currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) + val completedAlerts = currentAlerts.filter { it.state == COMPLETED } + assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size) + val previouslyAcknowledgedAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") } + val previouslyActiveAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_2") } + assertTrue( + "Previously acknowledged alert was not updated when it moved to completed", + previouslyAcknowledgedAlert.lastNotificationTime!! > currentAcknowledgedAlert.lastNotificationTime + ) + assertTrue( + "Previously active alert was not updated when it moved to completed", + previouslyActiveAlert.lastNotificationTime!! > currentActiveAlert.lastNotificationTime + ) + } + @Suppress("UNCHECKED_CAST") fun `test bucket-level monitor with one good action and one bad action`() { val testIndex = createTestIndex()