diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 809ac76ca..e18bc777f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -301,6 +301,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, + AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT, LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT, LegacyOpenDistroAlertingSettings.BULK_TIMEOUT, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index d956d029a..03c24a9d5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -64,6 +64,8 @@ import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT +import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS import org.opensearch.alerting.settings.DestinationSettings @@ -115,6 +117,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { @Volatile private lateinit var destinationSettings: Map @Volatile private lateinit var destinationContextFactory: DestinationContextFactory + @Volatile private var maxActionableAlertCount = DEFAULT_MAX_ACTIONABLE_ALERT_COUNT + private lateinit var runnerSupervisor: Job override val coroutineContext: CoroutineContext get() = Dispatchers.Default + runnerSupervisor @@ -190,6 +194,11 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { // Host deny list is not a dynamic setting so no consumer is registered but the variable is set here hostDenyList = HOST_DENY_LIST.get(settings) + maxActionableAlertCount = MAX_ACTIONABLE_ALERT_COUNT.get(settings) + clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_ACTIONABLE_ALERT_COUNT) { + maxActionableAlertCount = it + } + return this } @@ -415,7 +424,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList()) /* - * Index de-duped and new Alerts here so they are available at the time the Actions are executed. + * Index de-duped and new Alerts here (if it's not a test Monitor) so they are available at the time the Actions are executed. * * The new Alerts have to be returned and saved back with their indexed doc ID to prevent duplicate documents * when the Alerts are updated again after Action execution. @@ -423,8 +432,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { * Note: Index operations can fail for various reasons (such as write blocks on cluster), in such a case, the Actions * will still execute with the Alert information in the ctx but the Alerts may not be visible. */ - alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true) - newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy) + if (!dryrun && monitor.id != Monitor.NO_ID) { + alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true) + newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy) + } // Store deduped and new Alerts to accumulate across pages if (!nextAlerts.containsKey(trigger.id)) { @@ -452,8 +463,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { // Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them. // New Alerts are ignored since they cannot be acknowledged yet. val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED) - ?.filterNot { it.state == Alert.State.ACKNOWLEDGED } + ?.filterNot { it.state == Alert.State.ACKNOWLEDGED }?.toMutableList() ?: mutableListOf() + // Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution + nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts) val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf() val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf() @@ -461,19 +474,17 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { val triggerCtx = triggerContexts[trigger.id]!! val triggerResult = triggerResults[trigger.id]!! val monitorOrTriggerError = monitorResult.error ?: triggerResult.error + val shouldDefaultToPerExecution = defaultToPerExecutionAction( + monitorId = monitor.id, + triggerId = trigger.id, + totalActionableAlertCount = dedupedAlerts.size + newAlerts.size + completedAlerts.size, + monitorOrTriggerError = monitorOrTriggerError + ) for (action in trigger.actions) { - // If the monitor or triggerResult has an error, then default to PER_EXECUTION to communicate the error. - // Typically, given the actions taken by runBucketLevelTrigger, an exception during the operation could mean - // either there were incompatible trigger conditions or there was a parsing error on the results. - if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && monitorOrTriggerError == null) { + if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) { val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope for (alertCategory in perAlertActionFrequency.actionableAlerts) { - var alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() - // Filter out ACKNOWLEDGED Alerts from the deduped Alerts - if (alertCategory == AlertCategory.DEDUPED) { - alertsToExecuteActionsFor = alertsToExecuteActionsFor.filterNot { it.state == Alert.State.ACKNOWLEDGED } - .toMutableList() - } + val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() for (alert in alertsToExecuteActionsFor) { if (isBucketLevelTriggerActionThrottled(action, alert)) continue @@ -491,7 +502,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { alertsToUpdate.add(alert) } } - } else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || monitorOrTriggerError != null) { + } else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) { // If all categories of Alerts are empty, there is nothing to message on and we can skip the Action. // If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified. if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue @@ -533,14 +544,48 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { ) } - // Update Alerts with action execution results - // ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them - alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false) + // Update Alerts with action execution results (if it's not a test Monitor). + // ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them. + if (!dryrun && monitor.id != Monitor.NO_ID) { + alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false) + } } return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults) } + private fun defaultToPerExecutionAction( + monitorId: String, + triggerId: String, + totalActionableAlertCount: Int, + monitorOrTriggerError: Exception? + ): Boolean { + // If the monitorId or triggerResult has an error, then also default to PER_EXECUTION to communicate the error + if (monitorOrTriggerError != null) { + logger.debug( + "Trigger [$triggerId] in monitor [$monitorId] encountered an error. Defaulting to " + + "[${ActionExecutionScope.Type.PER_EXECUTION}] for action execution to communicate error." + ) + return true + } + + // If the MAX_ACTIONABLE_ALERT_COUNT is set to -1, consider it unbounded and proceed regardless of actionable Alert count + if (maxActionableAlertCount < 0) return false + + // If the total number of Alerts to execute Actions on exceeds the MAX_ACTIONABLE_ALERT_COUNT setting then default to + // PER_EXECUTION for less intrusive Actions + if (totalActionableAlertCount > maxActionableAlertCount) { + logger.debug( + "The total actionable alerts for trigger [$triggerId] in monitor [$monitorId] is [$totalActionableAlertCount] " + + "which exceeds the maximum of [$maxActionableAlertCount]. Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] " + + "for action execution." + ) + return true + } + + return false + } + private fun getRolesForMonitor(monitor: Monitor): List { /* * We need to handle 3 cases: diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index f00a43aa2..c314770dd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -38,6 +38,7 @@ class AlertingSettings { const val MONITOR_MAX_INPUTS = 1 const val MONITOR_MAX_TRIGGERS = 10 + const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -134,5 +135,12 @@ class AlertingSettings { LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val MAX_ACTIONABLE_ALERT_COUNT = Setting.longSetting( + "plugins.alerting.max_actionable_alert_count", + DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, + -1L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index 8b48d7394..59ebf6eb5 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -972,10 +972,6 @@ class MonitorRunnerIT : AlertingRestTestCase() { } } - // TODO: The composite aggregation will paginate through all results during the Bucket-Level Monitor run. - // The last page (when after_key is null) is empty if all the contents fit on the previous page, meaning the - // input results returned by the monitor execution is empty. - // Skipping this test for now until this is resolved to show a non-empty result. fun `test execute bucket-level monitor returns search result`() { val testIndex = createTestIndex() insertSampleTimeSerializedData( @@ -1058,7 +1054,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) ) val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) - executeMonitor(monitor.id, params = DRYRUN_MONITOR) + executeMonitor(monitor.id) // Check created alerts var alerts = searchAlerts(monitor) @@ -1077,7 +1073,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) // Execute monitor again - executeMonitor(monitor.id, params = DRYRUN_MONITOR) + executeMonitor(monitor.id) // Verify expected alert was completed alerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) @@ -1121,7 +1117,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) ) val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) - executeMonitor(monitor.id, params = DRYRUN_MONITOR) + executeMonitor(monitor.id) // Check created Alerts var currentAlerts = searchAlerts(monitor) @@ -1140,7 +1136,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // let lastNotificationTime change. W/o this sleep the test can result in a false negative. Thread.sleep(200) - executeMonitor(monitor.id, params = DRYRUN_MONITOR) + executeMonitor(monitor.id) // Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was currentAlerts = searchAlerts(monitor) @@ -1160,7 +1156,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { // Execute Monitor and check that both Alerts were updated Thread.sleep(200) - executeMonitor(monitor.id, params = DRYRUN_MONITOR) + executeMonitor(monitor.id) currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) val completedAlerts = currentAlerts.filter { it.state == COMPLETED } assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size)