Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting to limit per alert action executions and don't save Alerts for test Bucket-Level Monitors #161

Merged
merged 1 commit into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down
81 changes: 63 additions & 18 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import org.opensearch.alerting.settings.DestinationSettings
Expand Down Expand Up @@ -115,6 +117,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
@Volatile private lateinit var destinationSettings: Map<String, DestinationSettings.Companion.SecureDestinationSettings>
@Volatile private lateinit var destinationContextFactory: DestinationContextFactory

@Volatile private var maxActionableAlertCount = DEFAULT_MAX_ACTIONABLE_ALERT_COUNT

private lateinit var runnerSupervisor: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + runnerSupervisor
Expand Down Expand Up @@ -190,6 +194,11 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
// Host deny list is not a dynamic setting so no consumer is registered but the variable is set here
hostDenyList = HOST_DENY_LIST.get(settings)

maxActionableAlertCount = MAX_ACTIONABLE_ALERT_COUNT.get(settings)
clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_ACTIONABLE_ALERT_COUNT) {
maxActionableAlertCount = it
}

return this
}

Expand Down Expand Up @@ -415,16 +424,18 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList())

/*
* Index de-duped and new Alerts here so they are available at the time the Actions are executed.
* Index de-duped and new Alerts here (if it's not a test Monitor) so they are available at the time the Actions are executed.
*
* The new Alerts have to be returned and saved back with their indexed doc ID to prevent duplicate documents
* when the Alerts are updated again after Action execution.
*
* Note: Index operations can fail for various reasons (such as write blocks on cluster), in such a case, the Actions
* will still execute with the Alert information in the ctx but the Alerts may not be visible.
*/
alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true)
newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy)
if (!dryrun && monitor.id != Monitor.NO_ID) {
alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true)
newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy)
}

// Store deduped and new Alerts to accumulate across pages
if (!nextAlerts.containsKey(trigger.id)) {
Expand Down Expand Up @@ -452,28 +463,28 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
// Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them.
// New Alerts are ignored since they cannot be acknowledged yet.
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)
?.filterNot { it.state == Alert.State.ACKNOWLEDGED }
?.filterNot { it.state == Alert.State.ACKNOWLEDGED }?.toMutableList()
?: mutableListOf()
// Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution
nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts)
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
val monitorOrTriggerError = monitorResult.error ?: triggerResult.error
val shouldDefaultToPerExecution = defaultToPerExecutionAction(
monitorId = monitor.id,
triggerId = trigger.id,
totalActionableAlertCount = dedupedAlerts.size + newAlerts.size + completedAlerts.size,
monitorOrTriggerError = monitorOrTriggerError
)
for (action in trigger.actions) {
// If the monitor or triggerResult has an error, then default to PER_EXECUTION to communicate the error.
// Typically, given the actions taken by runBucketLevelTrigger, an exception during the operation could mean
// either there were incompatible trigger conditions or there was a parsing error on the results.
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && monitorOrTriggerError == null) {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) {
val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionFrequency.actionableAlerts) {
var alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
// Filter out ACKNOWLEDGED Alerts from the deduped Alerts
if (alertCategory == AlertCategory.DEDUPED) {
alertsToExecuteActionsFor = alertsToExecuteActionsFor.filterNot { it.state == Alert.State.ACKNOWLEDGED }
.toMutableList()
}
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
if (isBucketLevelTriggerActionThrottled(action, alert)) continue

Expand All @@ -491,7 +502,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
alertsToUpdate.add(alert)
}
}
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || monitorOrTriggerError != null) {
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action.
// If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified.
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue
Expand Down Expand Up @@ -533,14 +544,48 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
)
}

// Update Alerts with action execution results
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them
alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
// Update Alerts with action execution results (if it's not a test Monitor).
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them.
if (!dryrun && monitor.id != Monitor.NO_ID) {
alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
}
}

return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

private fun defaultToPerExecutionAction(
monitorId: String,
triggerId: String,
totalActionableAlertCount: Int,
monitorOrTriggerError: Exception?
): Boolean {
// If the monitorId or triggerResult has an error, then also default to PER_EXECUTION to communicate the error
if (monitorOrTriggerError != null) {
logger.debug(
"Trigger [$triggerId] in monitor [$monitorId] encountered an error. Defaulting to " +
"[${ActionExecutionScope.Type.PER_EXECUTION}] for action execution to communicate error."
)
return true
}

// If the MAX_ACTIONABLE_ALERT_COUNT is set to -1, consider it unbounded and proceed regardless of actionable Alert count
if (maxActionableAlertCount < 0) return false

// If the total number of Alerts to execute Actions on exceeds the MAX_ACTIONABLE_ALERT_COUNT setting then default to
// PER_EXECUTION for less intrusive Actions
if (totalActionableAlertCount > maxActionableAlertCount) {
logger.debug(
"The total actionable alerts for trigger [$triggerId] in monitor [$monitorId] is [$totalActionableAlertCount] " +
"which exceeds the maximum of [$maxActionableAlertCount]. Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] " +
"for action execution."
)
return true
}

return false
}

private fun getRolesForMonitor(monitor: Monitor): List<String> {
/*
* We need to handle 3 cases:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AlertingSettings {

const val MONITOR_MAX_INPUTS = 1
const val MONITOR_MAX_TRIGGERS = 10
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand Down Expand Up @@ -134,5 +135,12 @@ class AlertingSettings {
LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val MAX_ACTIONABLE_ALERT_COUNT = Setting.longSetting(
"plugins.alerting.max_actionable_alert_count",
DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
-1L,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -972,10 +972,6 @@ class MonitorRunnerIT : AlertingRestTestCase() {
}
}

// TODO: The composite aggregation will paginate through all results during the Bucket-Level Monitor run.
// The last page (when after_key is null) is empty if all the contents fit on the previous page, meaning the
// input results returned by the monitor execution is empty.
// Skipping this test for now until this is resolved to show a non-empty result.
fun `test execute bucket-level monitor returns search result`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
Expand Down Expand Up @@ -1058,7 +1054,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
)
)
val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)))
executeMonitor(monitor.id, params = DRYRUN_MONITOR)
executeMonitor(monitor.id)

// Check created alerts
var alerts = searchAlerts(monitor)
Expand All @@ -1077,7 +1073,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
)

// Execute monitor again
executeMonitor(monitor.id, params = DRYRUN_MONITOR)
executeMonitor(monitor.id)

// Verify expected alert was completed
alerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN)
Expand Down Expand Up @@ -1121,7 +1117,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
)
)
val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)))
executeMonitor(monitor.id, params = DRYRUN_MONITOR)
executeMonitor(monitor.id)

// Check created Alerts
var currentAlerts = searchAlerts(monitor)
Expand All @@ -1140,7 +1136,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
executeMonitor(monitor.id, params = DRYRUN_MONITOR)
executeMonitor(monitor.id)

// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
currentAlerts = searchAlerts(monitor)
Expand All @@ -1160,7 +1156,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {

// Execute Monitor and check that both Alerts were updated
Thread.sleep(200)
executeMonitor(monitor.id, params = DRYRUN_MONITOR)
executeMonitor(monitor.id)
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN)
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size)
Expand Down