diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index b1957e0ee..1b47d2017 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -385,6 +385,17 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { val triggerResult = triggerService.runBucketLevelTrigger(monitor, trigger, triggerCtx) triggerResults[trigger.id] = triggerResult.getCombinedTriggerRunResult(triggerResults[trigger.id]) + /* + * If an error was encountered when running the trigger, it means that something went wrong when parsing the input results + * for the filtered buckets returned from the pipeline bucket selector injected into the input query. + * + * In this case, the returned aggregation result buckets are empty so the categorization of the Alerts that happens below + * should be skipped/invalidated since comparing the current Alerts to an empty result will lead the execution to believe + * that all Alerts have been COMPLETED. Not doing so would mean it would not be possible to propagate the error into the + * existing Alerts in a way the user can easily view them since they will have all been moved to the history index. + */ + if (triggerResults[trigger.id]?.error != null) continue + // TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can // be refactored to use a map instead val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor( @@ -418,9 +429,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { } } while (monitorResult.inputResults.afterKeysPresent()) - // The completed Alerts are whatever are left in the currentAlerts + // The completed Alerts are whatever are left in the currentAlerts. + // However, this operation will only be done if there was no trigger error, since otherwise the nextAlerts were not collected + // in favor of just using the currentAlerts as-is. currentAlerts.forEach { (trigger, keysToAlertsMap) -> - nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap)) + if (triggerResults[trigger.id]?.error == null) + nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap)) } for (trigger in monitor.triggers) { @@ -432,8 +446,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { // All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop val triggerCtx = triggerContexts[trigger.id]!! val triggerResult = triggerResults[trigger.id]!! + val monitorOrTriggerError = monitorResult.error ?: triggerResult.error for (action in trigger.actions) { - if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT) { + // If the monitor or triggerResult has an error, then default to PER_EXECUTION to communicate the error. + // Typically, given the actions taken by runBucketLevelTrigger, an exception during the operation could mean + // either there were incompatible trigger conditions or there was a parsing error on the results. + if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && monitorOrTriggerError == null) { val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope for (alertCategory in perAlertActionFrequency.actionableAlerts) { val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() @@ -441,7 +459,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { if (isBucketLevelTriggerActionThrottled(action, alert)) continue val actionCtx = getActionContextForAlertCategory( - alertCategory, alert, triggerCtx, monitorResult.error ?: triggerResult.error + alertCategory, alert, triggerCtx, monitorOrTriggerError ) // AggregationResultBucket should not be null here val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash() @@ -454,9 +472,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { alertsToUpdate.add(alert) } } - } else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION) { - // If all categories of Alerts are empty, there is nothing to message on and we can skip the Action - if (dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue + } else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || monitorOrTriggerError != null) { + // If all categories of Alerts are empty, there is nothing to message on and we can skip the Action. + // If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified. + if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue val actionCtx = triggerCtx.copy( dedupedAlerts = dedupedAlerts, @@ -465,8 +484,13 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { error = monitorResult.error ?: triggerResult.error ) val actionResult = runAction(action, actionCtx, dryrun) + // If there was an error during trigger execution then the Alerts to be updated are the current Alerts since the state + // was not changed. Otherwise, the Alerts to be updated are the sum of the deduped, new and completed Alerts. + val alertsToIterate = if (monitorOrTriggerError == null) { + (dedupedAlerts + newAlerts + completedAlerts) + } else currentAlerts[trigger]?.map { it.value } ?: listOf() // Save the Action run result for every Alert - for (alert in (dedupedAlerts + newAlerts + completedAlerts)) { + for (alert in alertsToIterate) { val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash() if (!triggerResult.actionResultsMap.containsKey(alertBucketKeysHash)) { triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index bb92ba7ea..6fda9e107 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -88,8 +88,7 @@ class TriggerService(val scriptService: ScriptService) { } BucketLevelTriggerRunResult(trigger.name, null, selectedBuckets) } catch (e: Exception) { - logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e) - // TODO empty map here with error should be treated in the same way as QueryLevelTrigger with error running script + logger.info("Error running trigger [${trigger.id}] for monitor [${monitor.id}]", e) BucketLevelTriggerRunResult(trigger.name, e, emptyMap()) } }