Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip execution of Actions on ACKNOWLEDGED Alerts for Bucket-Level Monitors #158

Merged
merged 1 commit into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,15 @@ class AlertService(
val currentAlert = currentAlerts[aggAlertBucket.getBucketKeysHash()]
if (currentAlert != null) {
// De-duped Alert
dedupedAlerts.add(currentAlert.copy(lastNotificationTime = currentTime, aggregationResultBucket = aggAlertBucket))
dedupedAlerts.add(currentAlert.copy(aggregationResultBucket = aggAlertBucket))

// Remove de-duped Alert from currentAlerts since it is no longer a candidate for a potentially completed Alert
currentAlerts.remove(aggAlertBucket.getBucketKeysHash())
} else {
// New Alert
// TODO: Setting lastNotificationTime is deceiving since the actions haven't run yet, maybe it should be null here
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
lastNotificationTime = null, state = Alert.State.ACTIVE, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket
)
Expand All @@ -266,7 +265,7 @@ class AlertService(
} ?: listOf()
}

suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy) {
suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy, allowUpdatingAcknowledgedAlert: Boolean = false) {
var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand All @@ -281,7 +280,21 @@ class AlertService(
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
}
Alert.State.ACKNOWLEDGED, Alert.State.DELETED -> {
Alert.State.ACKNOWLEDGED -> {
// Allow ACKNOWLEDGED Alerts to be updated for Bucket-Level Monitors since de-duped Alerts can be ACKNOWLEDGED
// and updated by the MonitorRunner
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
} else {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
}
}
Alert.State.DELETED -> {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
}
Alert.State.COMPLETED -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
* Note: Index operations can fail for various reasons (such as write blocks on cluster), in such a case, the Actions
* will still execute with the Alert information in the ctx but the Alerts may not be visible.
*/
alertService.saveAlerts(dedupedAlerts, retryPolicy)
alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true)
newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy)

// Store deduped and new Alerts to accumulate across pages
Expand All @@ -425,7 +425,11 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

for (trigger in monitor.triggers) {
val alertsToUpdate = mutableSetOf<Alert>()
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED) ?: mutableListOf()
// Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them.
// New Alerts are ignored since they cannot be acknowledged yet.
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)
?.filterNot { it.state == Alert.State.ACKNOWLEDGED }
?: mutableListOf()
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

Expand All @@ -436,7 +440,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT) {
val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionFrequency.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
var alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
// Filter out ACKNOWLEDGED Alerts from the deduped Alerts
if (alertCategory == AlertCategory.DEDUPED) {
alertsToExecuteActionsFor = alertsToExecuteActionsFor.filterNot { it.state == Alert.State.ACKNOWLEDGED }
.toMutableList()
}
for (alert in alertsToExecuteActionsFor) {
if (isBucketLevelTriggerActionThrottled(action, alert)) continue

Expand Down Expand Up @@ -483,15 +492,16 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
val bucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
val actionResults = triggerResult.actionResultsMap.getOrDefault(bucketKeysHash, emptyMap<String, ActionRunResult>())
alertService.updateActionResultsForBucketLevelAlert(
alert,
alert.copy(lastNotificationTime = currentTime()),
actionResults,
// TODO: Update BucketLevelTriggerRunResult.alertError() to retrieve error based on the first failed Action
monitorResult.alertError() ?: triggerResult.alertError()
)
}

// Update Alerts with action execution results
alertService.saveAlerts(updatedAlerts, retryPolicy)
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them
alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
}

return monitorResult.copy(triggerResults = triggerResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.model.destination.email.Email
import org.opensearch.alerting.model.destination.email.Recipient
import org.opensearch.alerting.util.DestinationType
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.client.ResponseException
import org.opensearch.client.WarningFailureException
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -1086,6 +1087,95 @@ class MonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Incorrect number of completed alerts", 1, completedAlerts.size)
}

fun `test bucket-level monitor with acknowledged alert`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
listOf(
"test_value_1",
"test_value_2"
)
)

val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
val triggerScript = """
params.docCount > 0
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null
)
)
val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)))
executeMonitor(monitor.id, params = DRYRUN_MONITOR)

// Check created Alerts
var currentAlerts = searchAlerts(monitor)
assertEquals("Alerts not saved", 2, currentAlerts.size)
currentAlerts.forEach {
verifyAlert(it, monitor, ACTIVE)
}

// Acknowledge one of the Alerts
val alertToAcknowledge = currentAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") }
acknowledgeAlerts(monitor, alertToAcknowledge)
currentAlerts = searchAlerts(monitor)
val acknowledgedAlert = currentAlerts.single { it.state == ACKNOWLEDGED }
val activeAlert = currentAlerts.single { it.state == ACTIVE }

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
executeMonitor(monitor.id, params = DRYRUN_MONITOR)

// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
currentAlerts = searchAlerts(monitor)
val currentAcknowledgedAlert = currentAlerts.single { it.state == ACKNOWLEDGED }
val currentActiveAlert = currentAlerts.single { it.state == ACTIVE }
assertEquals("Acknowledged alert was updated", acknowledgedAlert.lastNotificationTime, currentAcknowledgedAlert.lastNotificationTime)
assertTrue("Active alert was not updated", currentActiveAlert.lastNotificationTime!! > activeAlert.lastNotificationTime)

// Remove data so that both Alerts are moved into completed
deleteDataWithDocIds(
testIndex,
listOf(
"1", // test_value_1
"2" // test_value_2
)
)

// Execute Monitor and check that both Alerts were updated
Thread.sleep(200)
executeMonitor(monitor.id, params = DRYRUN_MONITOR)
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN)
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size)
val previouslyAcknowledgedAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") }
val previouslyActiveAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_2") }
assertTrue(
"Previously acknowledged alert was not updated when it moved to completed",
previouslyAcknowledgedAlert.lastNotificationTime!! > currentAcknowledgedAlert.lastNotificationTime
)
assertTrue(
"Previously active alert was not updated when it moved to completed",
previouslyActiveAlert.lastNotificationTime!! > currentActiveAlert.lastNotificationTime
)
}

@Suppress("UNCHECKED_CAST")
fun `test bucket-level monitor with one good action and one bad action`() {
val testIndex = createTestIndex()
Expand Down