From 063ed65cfce1066499007280dc5fafbd9761473c Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 5 Mar 2024 08:33:41 -0800 Subject: [PATCH] Added support for printing query/rule info in notification messages. Signed-off-by: AWSHurneyt --- .../alerting/DocumentLevelMonitorRunner.kt | 23 +++++++- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../alerting/DocumentMonitorRunnerIT.kt | 59 +++++++++++++++++++ 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index ff939418a..234653251 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -42,6 +42,7 @@ import org.opensearch.commons.alerting.action.PublishFindingsRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -95,6 +96,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + monitorCtx.findingsToTriggeredQueries = mutableMapOf() try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) @@ -456,6 +458,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) val alerts = mutableListOf() + val alertContexts = mutableListOf() triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), @@ -466,6 +469,14 @@ class DocumentLevelMonitorRunner : MonitorRunner() { workflorwRunContext = workflowRunContext ) alerts.add(alert) + alertContexts.add( + AlertContext( + alert = alert, + associatedQueries = alert.findingIds.flatMap { findingId -> + monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList() + } + ) + ) } val shouldDefaultToPerExecution = defaultToPerExecutionAction( @@ -479,13 +490,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() { for (action in trigger.actions) { val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { - for (alert in alerts) { + for (alert in alertContexts) { val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) } - } else if (alerts.isNotEmpty()) { - val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun) + } else if (alertContexts.isNotEmpty()) { + val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun) for (alert in alerts) { triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) @@ -532,6 +543,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { val findingDocPairs = mutableListOf>() val findings = mutableListOf() val indexRequests = mutableListOf() + val findingsToTriggeredQueries = mutableMapOf>() docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } @@ -552,6 +564,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) findingDocPairs.add(Pair(finding.id, it.key)) findings.add(finding) + findingsToTriggeredQueries[finding.id] = triggeredQueries val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) @@ -578,6 +591,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() { // suppress exception logger.error("Optional finding callback failed", e) } + + if (monitorCtx.findingsToTriggeredQueries == null) monitorCtx.findingsToTriggeredQueries = findingsToTriggeredQueries + else monitorCtx.findingsToTriggeredQueries = monitorCtx.findingsToTriggeredQueries!! + findingsToTriggeredQueries + return findingDocPairs } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 424656c6b..307c88b3b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -17,6 +17,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService @@ -38,6 +39,7 @@ data class MonitorRunnerExecutionContext( var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, var jvmStats: JvmStats? = null, + var findingsToTriggeredQueries: Map>? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index ed0668daf..22458913f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -1997,6 +1997,65 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals(1, output.objectMap("trigger_results").values.size) } + fun `test document-level monitor notification message includes queries`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val alertCategories = AlertCategory.values() + val actionExecutionScope = PerAlertActionScope( + actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet() + ) + val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope) + val actions = (0..randomInt(10)).map { + randomActionWithPolicy( + template = randomTemplateScript("{{#ctx.alerts}}\n{{#associated_queries}}\n(name={{name}})\n{{/associated_queries}}\n{{/ctx.alerts}}"), + destinationId = createDestination().id, + actionExecutionPolicy = actionExecutionPolicy + ) + } + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(2, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + assertEquals(actions.size, alertActionResult.values.size) + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] as Map + assertTrue( + "The notification message is missing the query name.", + actionOutput["message"]!!.contains("(name=${docQuery.name})") + ) + } + } + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> {