From 837cca77f841de867bc781d0d274380fc9254996 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 3 Aug 2023 15:16:16 -0700 Subject: [PATCH 1/3] chained alert behavior changes Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertService.kt | 142 ++++++++++++++++-- .../org/opensearch/alerting/TriggerService.kt | 10 ++ .../ChainedAlertTriggerExecutionContext.kt | 10 +- .../TransportGetWorkflowAlertsAction.kt | 10 +- .../workflow/CompositeWorkflowRunner.kt | 108 +++++++------ .../alerting/workflow/WorkflowRunner.kt | 6 +- .../alerting/MonitorDataSourcesIT.kt | 83 ++++++++++ 7 files changed, 309 insertions(+), 60 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 04424753a..2a69aea62 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -19,6 +19,7 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.ChainedAlertTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.opensearchapi.firstFailureOrNull import org.opensearch.alerting.opensearchapi.retry @@ -83,6 +84,26 @@ class AlertService( private val logger = LogManager.getLogger(AlertService::class.java) + suspend fun loadCurrentAlertsForWorkflow(workflow: Workflow, dataSources: DataSources): Map { + val searchAlertsResponse: SearchResponse = searchAlerts( + workflow = workflow, + size = workflow.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check + dataSources = dataSources + ) + + val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } + .groupBy { it.triggerId } + foundAlerts.values.forEach { alerts -> + if (alerts.size > 1) { + logger.warn("Found multiple alerts for same trigger: $alerts") + } + } + + return workflow.triggers.associateWith { trigger -> + foundAlerts[trigger.id]?.firstOrNull() + } + } + suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map { val searchAlertsResponse: SearchResponse = searchAlerts( monitor = monitor, @@ -257,18 +278,84 @@ class AlertService( ctx: ChainedAlertTriggerExecutionContext, executionId: String, workflow: Workflow, - associatedAlertIds: List - ): Alert { - return Alert( - startTime = Instant.now(), - lastNotificationTime = Instant.now(), - state = Alert.State.ACTIVE, - errorMessage = null, schemaVersion = -1, - chainedAlertTrigger = ctx.trigger, - executionId = executionId, - workflow = workflow, - associatedAlertIds = associatedAlertIds - ) + associatedAlertIds: List, + result: ChainedAlertTriggerRunResult, + alertError: AlertError? = null, + ): Alert? { + + val currentTime = Instant.now() + val currentAlert = ctx.alert + + val updatedActionExecutionResults = mutableListOf() + val currentActionIds = mutableSetOf() + if (currentAlert != null) { + // update current alert's action execution results + for (actionExecutionResult in currentAlert.actionExecutionResults) { + val actionId = actionExecutionResult.actionId + currentActionIds.add(actionId) + val actionRunResult = result.actionResults[actionId] + when { + actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult) + actionRunResult.throttled -> + updatedActionExecutionResults.add( + actionExecutionResult.copy( + throttledCount = actionExecutionResult.throttledCount + 1 + ) + ) + + else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime)) + } + } + // add action execution results which not exist in current alert + updatedActionExecutionResults.addAll( + result.actionResults.filter { !currentActionIds.contains(it.key) } + .map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) } + ) + } else { + updatedActionExecutionResults.addAll( + result.actionResults.map { + ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) + } + ) + } + + // Merge the alert's error message to the current alert's history + val updatedHistory = currentAlert?.errorHistory.update(alertError) + return if (alertError == null && !result.triggered) { + currentAlert?.copy( + state = Alert.State.COMPLETED, + endTime = currentTime, + errorMessage = null, + errorHistory = updatedHistory, + actionExecutionResults = updatedActionExecutionResults, + schemaVersion = IndexUtils.alertIndexSchemaVersion + ) + } else if (alertError == null && currentAlert?.isAcknowledged() == true) { + null + } else if (currentAlert != null) { + val alertState = Alert.State.ACTIVE + currentAlert.copy( + state = alertState, + lastNotificationTime = currentTime, + errorMessage = alertError?.message, + errorHistory = updatedHistory, + actionExecutionResults = updatedActionExecutionResults, + schemaVersion = IndexUtils.alertIndexSchemaVersion, + ) + } else { + if (alertError == null) Alert.State.ACTIVE + else Alert.State.ERROR + Alert( + startTime = Instant.now(), + lastNotificationTime = Instant.now(), + state = Alert.State.ACTIVE, + errorMessage = null, schemaVersion = -1, + chainedAlertTrigger = ctx.trigger, + executionId = executionId, + workflow = workflow, + associatedAlertIds = associatedAlertIds + ) + } } fun updateActionResultsForBucketLevelAlert( @@ -758,6 +845,37 @@ class AlertService( return searchResponse } + /** + * Searches for Alerts in the monitor's alertIndex. + * + * @param monitorId The Monitor to get Alerts for + * @param size The number of search hits (Alerts) to return + */ + private suspend fun searchAlerts( + workflow: Workflow, + size: Int, + dataSources: DataSources, + ): SearchResponse { + val workflowId = workflow.id + val alertIndex = dataSources.alertsIndex + + val queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId)) + .must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, "")) + val searchSourceBuilder = SearchSourceBuilder() + .size(size) + .query(queryBuilder) + + val searchRequest = SearchRequest(alertIndex) + .routing(workflowId) + .source(searchSourceBuilder) + val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + if (searchResponse.status() != RestStatus.OK) { + throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts")) + } + return searchResponse + } + private fun List?.update(alertError: AlertError?): List { return when { this == null && alertError == null -> emptyList() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index 8c64e43be..f2356eddf 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -12,6 +12,7 @@ import org.opensearch.alerting.model.ChainedAlertTriggerRunResult import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser @@ -52,6 +53,15 @@ class TriggerService(val scriptService: ScriptService) { return result.triggered && !suppress } + fun isChainedAlertTriggerActionable( + ctx: ChainedAlertTriggerExecutionContext, + result: ChainedAlertTriggerRunResult, + ): Boolean { + // Suppress actions if the current alert is acknowledged and there are no errors. + val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null + return result.triggered && !suppress + } + fun runQueryLevelTrigger( monitor: Monitor, trigger: QueryLevelTrigger, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt index a626c7667..a05b8c4bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.script import org.opensearch.alerting.model.WorkflowRunResult +import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.ChainedAlertTrigger import org.opensearch.commons.alerting.model.Workflow import java.time.Instant @@ -18,7 +19,8 @@ data class ChainedAlertTriggerExecutionContext( val error: Exception? = null, val trigger: ChainedAlertTrigger, val alertGeneratingMonitors: Set, - val monitorIdToAlertIdsMap: Map> + val monitorIdToAlertIdsMap: Map>, + val alert: Alert? = null ) { constructor( @@ -26,7 +28,8 @@ data class ChainedAlertTriggerExecutionContext( workflowRunResult: WorkflowRunResult, trigger: ChainedAlertTrigger, alertGeneratingMonitors: Set, - monitorIdToAlertIdsMap: Map> + monitorIdToAlertIdsMap: Map>, + alert: Alert? = null ) : this( workflow, @@ -36,7 +39,8 @@ data class ChainedAlertTriggerExecutionContext( workflowRunResult.error, trigger, alertGeneratingMonitors, - monitorIdToAlertIdsMap + monitorIdToAlertIdsMap, + alert ) /** diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt index b5a98015f..2d6c165c0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt @@ -157,8 +157,14 @@ class TransportGetWorkflowAlertsAction @Inject constructor( } fun resolveAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String { - return if (getAlertsRequest.alertIndex.isNullOrEmpty()) AlertIndices.ALERT_INDEX - else getAlertsRequest.alertIndex!! + var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN + if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) { + alertIndex = getAlertsRequest.alertIndex!! + } + return if (alertIndex == AlertIndices.ALERT_INDEX) + AlertIndices.ALL_ALERT_INDEX_PATTERN + else + alertIndex } fun resolveAssociatedAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index a4cee80d0..ffbdce821 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -150,36 +150,67 @@ object CompositeWorkflowRunner : WorkflowRunner() { error = lastErrorDelegateRun, triggerResults = triggerResults ) - if (dataSources != null) { - try { - monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources) - val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow) - for (trigger in workflow.triggers) { - val caTrigger = trigger as ChainedAlertTrigger - val triggerCtx = ChainedAlertTriggerExecutionContext( - workflow = workflow, - workflowRunResult = workflowRunResult, - trigger = caTrigger, - alertGeneratingMonitors = monitorIdToAlertIdsMap.keys, - monitorIdToAlertIdsMap = monitorIdToAlertIdsMap + val currentAlerts = try { + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources!!) + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(dataSources) + monitorCtx.alertService!!.loadCurrentAlertsForWorkflow(workflow, dataSources) + } catch (e: Exception) { + logger.error("Failed to fetch current alerts for workflow", e) + // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts + val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id + logger.error("Error loading alerts for workflow: $id", e) + return workflowRunResult.copy(error = e) + } + try { + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources) + val updatedAlerts = mutableListOf() + val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow) + for (trigger in workflow.triggers) { + val currentAlert = currentAlerts[trigger] + val caTrigger = trigger as ChainedAlertTrigger + val triggerCtx = ChainedAlertTriggerExecutionContext( + workflow = workflow, + workflowRunResult = workflowRunResult, + trigger = caTrigger, + alertGeneratingMonitors = monitorIdToAlertIdsMap.keys, + monitorIdToAlertIdsMap = monitorIdToAlertIdsMap, + alert = currentAlert + ) + runChainedAlertTrigger( + monitorCtx, + workflow, + trigger, + executionId, + triggerCtx, + dryRun, + triggerResults, + updatedAlerts + ) + } + if (!dryRun && workflow.id != Workflow.NO_ID && updatedAlerts.isNotEmpty()) { + monitorCtx.retryPolicy?.let { + monitorCtx.alertService!!.saveAlerts( + dataSources, + updatedAlerts, + it, + routingId = workflow.id ) - runChainedAlertTrigger(dataSources, monitorCtx, workflow, trigger, executionId, triggerCtx, dryRun, triggerResults) } - } catch (e: Exception) { - // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts - val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id - logger.error("Error loading current chained alerts for workflow: $id", e) - return WorkflowRunResult( - workflowId = workflow.id, - workflowName = workflow.name, - monitorRunResults = emptyList(), - executionStartTime = workflowExecutionStartTime, - executionEndTime = Instant.now(), - executionId = executionId, - error = AlertingException.wrap(e), - triggerResults = emptyMap() - ) } + } catch (e: Exception) { + // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts + val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id + logger.error("Error loading current chained alerts for workflow: $id", e) + return WorkflowRunResult( + workflowId = workflow.id, + workflowName = workflow.name, + monitorRunResults = emptyList(), + executionStartTime = workflowExecutionStartTime, + executionEndTime = Instant.now(), + executionId = executionId, + error = AlertingException.wrap(e), + triggerResults = emptyMap() + ) } workflowRunResult.executionEndTime = Instant.now() @@ -260,7 +291,6 @@ object CompositeWorkflowRunner : WorkflowRunner() { } private suspend fun runChainedAlertTrigger( - dataSources: DataSources, monitorCtx: MonitorRunnerExecutionContext, workflow: Workflow, trigger: ChainedAlertTrigger, @@ -268,29 +298,23 @@ object CompositeWorkflowRunner : WorkflowRunner() { triggerCtx: ChainedAlertTriggerExecutionContext, dryRun: Boolean, triggerResults: MutableMap, + updatedAlerts: MutableList, ) { val triggerRunResult = monitorCtx.triggerService!!.runChainedAlertTrigger( workflow, trigger, triggerCtx.alertGeneratingMonitors, triggerCtx.monitorIdToAlertIdsMap ) triggerResults[trigger.id] = triggerRunResult - if (triggerRunResult.triggered) { + if (monitorCtx.triggerService!!.isChainedAlertTriggerActionable(triggerCtx, triggerRunResult)) { val actionCtx = triggerCtx for (action in trigger.actions) { triggerRunResult.actionResults[action.id] = this.runAction(action, actionCtx, monitorCtx, workflow, dryRun) } - val alert = monitorCtx.alertService!!.composeChainedAlert( - triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList() - ) - if (!dryRun && workflow.id != Workflow.NO_ID) { - monitorCtx.retryPolicy?.let { - monitorCtx.alertService!!.saveAlerts( - dataSources, - listOf(alert), - it, - routingId = workflow.id - ) - } - } + } + val alert = monitorCtx.alertService!!.composeChainedAlert( + triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList(), triggerRunResult + ) + if (alert != null) { + updatedAlerts.add(alert) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt index 9ba5b05a9..4b954b168 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt @@ -31,6 +31,7 @@ import org.opensearch.commons.alerting.model.Table import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.core.common.Strings import org.opensearch.script.Script import org.opensearch.script.TemplateScript import java.time.Instant @@ -52,12 +53,15 @@ abstract class WorkflowRunner { dryrun: Boolean ): ActionRunResult { return try { + if (!MonitorRunnerService.isActionActionable(action, ctx.alert)) { + return ActionRunResult(action.id, action.name, mapOf(), true, null, null) + } val actionOutput = mutableMapOf() actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) { compileTemplate(action.subjectTemplate!!, ctx) } else "" actionOutput[Action.MESSAGE] = compileTemplate(action.messageTemplate, ctx) - if (actionOutput[Action.MESSAGE].isNullOrEmpty()) { + if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) { throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") } if (!dryrun) { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 31911fb10..89157d877 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -5719,4 +5719,87 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertTrue(idsSet0to99.all { it !in idsSet200to300 }) Assert.assertTrue(ids100to200.all { it !in idsSet200to300 }) } + + fun `test existing chained alert active alert is updated on consequtive trigger condition match`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + val notTrigger = randomChainedAlertTrigger( + name = "Not1OrNot2", + condition = Script("!monitor[id=${monitorResponse.id}] || !monitor[id=${monitorResponse2.id}]") + ) + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id), + triggers = listOf(notTrigger) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + val workflowId = workflowById!!.id + + /** no ACTIVE alert exists and chained alert trigger matches. Expect: new ACTIVE alert created**/ + var executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + assertTrue(executeWorkflowResponse.workflowRunResult.triggerResults[notTrigger.id]!!.triggered) + val workflowAlerts = getWorkflowAlerts(workflowId) + Assert.assertTrue(workflowAlerts.alerts.size == 1) + Assert.assertEquals(workflowAlerts.alerts[0].state, Alert.State.ACTIVE) + /** ACTIVE alert exists and chained alert trigger matched again. Expect: existing alert updated and remains in ACTIVE*/ + var executeWorkflowResponse1 = executeWorkflow(workflowById, workflowId, false)!! + assertTrue(executeWorkflowResponse1.workflowRunResult.triggerResults[notTrigger.id]!!.triggered) + val udpdatedActiveAlerts = getWorkflowAlerts(workflowId) + Assert.assertTrue(udpdatedActiveAlerts.alerts.size == 1) + Assert.assertEquals(udpdatedActiveAlerts.alerts[0].state, Alert.State.ACTIVE) + Assert.assertTrue(udpdatedActiveAlerts.alerts[0].lastNotificationTime!! > workflowAlerts.alerts[0].lastNotificationTime!!) + + /** Acknowledge ACTIVE alert*/ + val ackChainedAlerts = ackChainedAlerts(udpdatedActiveAlerts.alerts.stream().map { it.id }.collect(Collectors.toList()), workflowId) + Assert.assertTrue(ackChainedAlerts.acknowledged.size == 1) + Assert.assertTrue(ackChainedAlerts.missing.size == 0) + Assert.assertTrue(ackChainedAlerts.failed.size == 0) + + /** ACKNOWLEDGED alert exists and chained alert trigger matched again. Expect: existing alert updated and remains ACKNOWLEDGED*/ + var executeWorkflowResponse2 = executeWorkflow(workflowById, workflowId, false)!! + assertTrue(executeWorkflowResponse2.workflowRunResult.triggerResults[notTrigger.id]!!.triggered) + val acknowledgedAlert = getWorkflowAlerts(workflowId, alertState = Alert.State.ACKNOWLEDGED) + Assert.assertTrue(acknowledgedAlert.alerts.size == 1) + Assert.assertEquals(acknowledgedAlert.alerts[0].state, Alert.State.ACKNOWLEDGED) + Assert.assertTrue(acknowledgedAlert.alerts[0].lastNotificationTime!! == udpdatedActiveAlerts.alerts[0].lastNotificationTime!!) + + /** ACKNOWLEDGED alert exists and chained alert trigger NOT matched. Expect: ACKNOWLEDGD alert marked as COMPLETED**/ + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + var executeWorkflowResponse3 = executeWorkflow(workflowById, workflowId, false)!! + assertFalse(executeWorkflowResponse3.workflowRunResult.triggerResults[notTrigger.id]!!.triggered) + val completedAlert = getWorkflowAlerts(workflowId, alertState = Alert.State.COMPLETED) + Assert.assertTrue(completedAlert.alerts.size == 1) + Assert.assertEquals(completedAlert.alerts[0].state, Alert.State.COMPLETED) + Assert.assertTrue(completedAlert.alerts[0].endTime!! > acknowledgedAlert.alerts[0].lastNotificationTime!!) + + /** COMPLETED state alert exists and trigger matches. Expect: new ACTIVE state chaiend alert created*/ + var executeWorkflowResponse4 = executeWorkflow(workflowById, workflowId, false)!! + assertTrue(executeWorkflowResponse4.workflowRunResult.triggerResults[notTrigger.id]!!.triggered) + val newActiveAlert = getWorkflowAlerts(workflowId, alertState = Alert.State.ACTIVE) + Assert.assertTrue(newActiveAlert.alerts.size == 1) + Assert.assertEquals(newActiveAlert.alerts[0].state, Alert.State.ACTIVE) + Assert.assertTrue(newActiveAlert.alerts[0].lastNotificationTime!! > acknowledgedAlert.alerts[0].lastNotificationTime!!) + val completedAlert1 = getWorkflowAlerts(workflowId, alertState = Alert.State.COMPLETED) + Assert.assertTrue(completedAlert1.alerts.size == 1) + Assert.assertEquals(completedAlert1.alerts[0].state, Alert.State.COMPLETED) + Assert.assertTrue(completedAlert1.alerts[0].endTime!! > acknowledgedAlert.alerts[0].lastNotificationTime!!) + } } From 584184f498963db4f660be0fa1abe3847f6309f7 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 7 Aug 2023 14:08:10 -0700 Subject: [PATCH 2/3] address comments Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertService.kt | 2 +- .../ChainedAlertTriggerExecutionContext.kt | 20 ------------------- .../workflow/CompositeWorkflowRunner.kt | 2 ++ 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 2a69aea62..a3f854b10 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -347,7 +347,7 @@ class AlertService( else Alert.State.ERROR Alert( startTime = Instant.now(), - lastNotificationTime = Instant.now(), + lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null, schemaVersion = -1, chainedAlertTrigger = ctx.trigger, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt index a05b8c4bc..d4bf4cb59 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt @@ -23,26 +23,6 @@ data class ChainedAlertTriggerExecutionContext( val alert: Alert? = null ) { - constructor( - workflow: Workflow, - workflowRunResult: WorkflowRunResult, - trigger: ChainedAlertTrigger, - alertGeneratingMonitors: Set, - monitorIdToAlertIdsMap: Map>, - alert: Alert? = null - ) : - this( - workflow, - workflowRunResult, - workflowRunResult.executionStartTime, - workflowRunResult.executionEndTime, - workflowRunResult.error, - trigger, - alertGeneratingMonitors, - monitorIdToAlertIdsMap, - alert - ) - /** * Mustache templates need special permissions to reflectively introspect field names. To avoid doing this we * translate the context to a Map of Strings to primitive types, which can be accessed without reflection. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index ffbdce821..bc9ac98dc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -171,6 +171,8 @@ object CompositeWorkflowRunner : WorkflowRunner() { val triggerCtx = ChainedAlertTriggerExecutionContext( workflow = workflow, workflowRunResult = workflowRunResult, + periodStart = workflowRunResult.executionStartTime, + periodEnd = workflowRunResult.executionEndTime, trigger = caTrigger, alertGeneratingMonitors = monitorIdToAlertIdsMap.keys, monitorIdToAlertIdsMap = monitorIdToAlertIdsMap, From b41a77e3199cf0d7e4404836ab52718556e994d1 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 15 Aug 2023 11:36:08 -0700 Subject: [PATCH 3/3] update comment for search alerts method Signed-off-by: Surya Sashank Nistala --- .../src/main/kotlin/org/opensearch/alerting/AlertService.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index a3f854b10..4ba28a408 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -349,7 +349,7 @@ class AlertService( startTime = Instant.now(), lastNotificationTime = currentTime, state = Alert.State.ACTIVE, - errorMessage = null, schemaVersion = -1, + errorMessage = null, schemaVersion = IndexUtils.alertIndexSchemaVersion, chainedAlertTrigger = ctx.trigger, executionId = executionId, workflow = workflow, @@ -846,7 +846,7 @@ class AlertService( } /** - * Searches for Alerts in the monitor's alertIndex. + * Searches for ACTIVE/ACKNOWLEDGED chained alerts in the workflow's alertIndex. * * @param monitorId The Monitor to get Alerts for * @param size The number of search hits (Alerts) to return