From 65f7f097c24ca5e5267644ccaab605b7e68ccd40 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Thu, 8 Feb 2024 17:05:28 -0800 Subject: [PATCH 01/16] Adding dev logs. Signed-off-by: AWSHurneyt --- .../alerting/script/BucketLevelTriggerExecutionContext.kt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt index 72518ed48..2365babc9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -5,6 +5,7 @@ package org.opensearch.alerting.script +import org.apache.logging.log4j.LogManager import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.Alert @@ -12,6 +13,8 @@ import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant +private val logger = LogManager.getLogger(BucketLevelTriggerExecutionContext::class.java) + data class BucketLevelTriggerExecutionContext( override val monitor: Monitor, val trigger: BucketLevelTrigger, @@ -41,11 +44,13 @@ data class BucketLevelTriggerExecutionContext( * translate the context to a Map of Strings to primitive types, which can be accessed without reflection. */ override fun asTemplateArg(): Map { + logger.info("hurneyt BucketLevelTriggerExecutionContext::results = {}", results) val tempArg = super.asTemplateArg().toMutableMap() tempArg["trigger"] = trigger.asTemplateArg() tempArg["dedupedAlerts"] = dedupedAlerts.map { it.asTemplateArg() } tempArg["newAlerts"] = newAlerts.map { it.asTemplateArg() } tempArg["completedAlerts"] = completedAlerts.map { it.asTemplateArg() } + tempArg["results"] = results return tempArg } } From dc8431ff50471b7e59dcff2b4a0bf473c3f16dec Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 4 Mar 2024 05:05:22 -0800 Subject: [PATCH 02/16] Added support for returning sample documents for bucket level monitors. Signed-off-by: AWSHurneyt --- .../alerting/BucketLevelMonitorRunner.kt | 91 ++++++++++++++++++- .../org/opensearch/alerting/InputService.kt | 87 +++++++++++------- .../BucketLevelTriggerExecutionContext.kt | 1 - .../alerting/util/AggregationQueryRewriter.kt | 34 ++++++- .../opensearch/alerting/util/AlertingUtils.kt | 4 +- .../alerting/MonitorRunnerServiceIT.kt | 87 ++++++++++++++++++ 6 files changed, 264 insertions(+), 40 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index e960b9da5..512b96a3d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -17,6 +17,7 @@ import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.opensearchapi.withClosableContext @@ -29,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor @@ -220,6 +222,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { ?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap)) } + val alertSampleDocs = mutableMapOf>>>() for (trigger in monitor.triggers) { val alertsToUpdate = mutableSetOf() val completedAlertsToUpdate = mutableSetOf() @@ -230,6 +233,65 @@ object BucketLevelMonitorRunner : MonitorRunner() { ?: mutableListOf() // Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts) + + @Suppress("UNCHECKED_CAST") + if (!nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty()) { + val sampleDocumentsByBucket = mutableMapOf>>() + try { + val searchRequest = monitorCtx.inputService!!.getSearchRequest( + monitor = monitor.copy(triggers = listOf(trigger)), + searchInput = monitor.inputs[0] as SearchInput, + periodStart = periodStart, + periodEnd = periodEnd, + prevResult = monitorResult.inputResults, + matchingDocIdsPerIndex = null, + returnSampleDocs = true + ) + + val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(searchRequest, it) } + val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf()) as Map + val compositeAgg = aggs.getOrDefault("composite_agg", mapOf()) as Map + val buckets = compositeAgg.getOrDefault("buckets", emptyList>()) as List> + + buckets.forEach { bucket -> + val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf()) as Map).values.toList()) + if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.") + + val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf()) as Map) + .getOrDefault("hits", mapOf()) as Map + val topHits = unwrappedTopHits.getOrDefault("hits", listOf>()) as List> + + val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf()) as Map) + .getOrDefault("hits", mapOf()) as Map + val lowHits = unwrappedLowHits.getOrDefault("hits", listOf>()) as List> + + val allHits = topHits + lowHits + + if (allHits.isEmpty()) { + // We expect sample documents to be available for each bucket. + logger.error("Sample documents not found for trigger {} of monitor {}.", trigger.id, monitor.id) + } + + // Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each. + // The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data. + val uniqueHitIds = mutableSetOf() + val dedupedHits = mutableListOf>() + allHits.forEach { hit -> + val hitId = hit["_id"] as String + if (!uniqueHitIds.contains(hitId)) { + uniqueHitIds.add(hitId) + dedupedHits.add(hit) + } + } + sampleDocumentsByBucket[bucketKey] = dedupedHits + } + + alertSampleDocs[trigger.id] = sampleDocumentsByBucket + } catch (e: Exception) { + logger.error("Error retrieving sample documents for trigger {} of monitor {}.", trigger.id, monitor.id, e) + } + } + val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf() val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf() @@ -255,8 +317,11 @@ object BucketLevelMonitorRunner : MonitorRunner() { for (alertCategory in actionExecutionScope.actionableAlerts) { val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() for (alert in alertsToExecuteActionsFor) { + val alertContext = if (alertCategory != AlertCategory.NEW) alert + else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs) + val actionCtx = getActionContextForAlertCategory( - alertCategory, alert, triggerCtx, monitorOrTriggerError + alertCategory, alertContext, triggerCtx, monitorOrTriggerError ) // AggregationResultBucket should not be null here val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash() @@ -287,7 +352,9 @@ object BucketLevelMonitorRunner : MonitorRunner() { val actionCtx = triggerCtx.copy( dedupedAlerts = dedupedAlerts, - newAlerts = newAlerts, + newAlerts = newAlerts.map { + getAlertContext(alert = it, alertSampleDocs = alertSampleDocs) + }, completedAlerts = completedAlerts, error = monitorResult.error ?: triggerResult.error ) @@ -493,4 +560,24 @@ object BucketLevelMonitorRunner : MonitorRunner() { ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alert), error = error) } } + + private fun getAlertContext( + alert: Alert, + alertSampleDocs: Map>>> + ): Alert { + val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash() + val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey) + return if (!bucketKey.isNullOrEmpty() && !sampleDocs.isNullOrEmpty()) { + AlertContext(alert = alert, sampleDocs = sampleDocs) + } else { + logger.warn( + "Error retrieving sample documents for alert {} from trigger {} of monitor {} during execution {}.", + alert.id, + alert.triggerId, + alert.monitorId, + alert.executionId + ) + alert + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index e0e06606c..82c16af37 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -82,39 +82,15 @@ class InputService( monitor.inputs.forEach { input -> when (input) { is SearchInput -> { - // TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient - val searchParams = mapOf( - "period_start" to periodStart.toEpochMilli(), - "period_end" to periodEnd.toEpochMilli() + val searchRequest = getSearchRequest( + monitor = monitor, + searchInput = input, + periodStart = periodStart, + periodEnd = periodEnd, + prevResult = prevResult, + matchingDocIdsPerIndex = matchingDocIdsPerIndex, + returnSampleDocs = false ) - - // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly - // which causes a strange bug where the rewritten query persists on the Monitor across executions - val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers) - - // Rewrite query to consider the doc ids per given index - if (chainedFindingExist(matchingDocIdsPerIndex) && rewrittenQuery.query() != null) { - val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), matchingDocIdsPerIndex!!) - rewrittenQuery.query(updatedSourceQuery) - } - - val searchSource = scriptService.compile( - Script( - ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, - rewrittenQuery.toString(), searchParams - ), - TemplateScript.CONTEXT - ) - .newInstance(searchParams) - .execute() - - val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(input.indices, clusterService) - val searchRequest = SearchRequest() - .indices(*indexes.toTypedArray()) - .preference(Preference.PRIMARY_FIRST.type()) - XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { - searchRequest.source(SearchSourceBuilder.fromXContent(it)) - } val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } aggTriggerAfterKey += AggregationQueryRewriter.getAfterKeysFromSearchResponse( searchResponse, @@ -259,4 +235,51 @@ class InputService( InputRunResults(emptyList(), e) } } + + suspend fun getSearchRequest( + monitor: Monitor, + searchInput: SearchInput, + periodStart: Instant, + periodEnd: Instant, + prevResult: InputRunResults?, + matchingDocIdsPerIndex: Map>?, + returnSampleDocs: Boolean = false + ): SearchRequest { + // TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient + val searchParams = mapOf( + "period_start" to periodStart.toEpochMilli(), + "period_end" to periodEnd.toEpochMilli() + ) + + // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly + // which causes a strange bug where the rewritten query persists on the Monitor across executions + val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(searchInput.query), prevResult, monitor.triggers, returnSampleDocs) + + // Rewrite query to consider the doc ids per given index + if (chainedFindingExist(matchingDocIdsPerIndex) && rewrittenQuery.query() != null) { + val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), matchingDocIdsPerIndex!!) + rewrittenQuery.query(updatedSourceQuery) + } + + val searchSource = scriptService.compile( + Script( + ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, + rewrittenQuery.toString(), searchParams + ), + TemplateScript.CONTEXT + ) + .newInstance(searchParams) + .execute() + + val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService) + val searchRequest = SearchRequest() + .indices(*indexes.toTypedArray()) + .preference(Preference.PRIMARY_FIRST.type()) + + XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { + searchRequest.source(SearchSourceBuilder.fromXContent(it)) + } + + return searchRequest + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt index 2365babc9..b3478cfb8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -44,7 +44,6 @@ data class BucketLevelTriggerExecutionContext( * translate the context to a Map of Strings to primitive types, which can be accessed without reflection. */ override fun asTemplateArg(): Map { - logger.info("hurneyt BucketLevelTriggerExecutionContext::results = {}", results) val tempArg = super.asTemplateArg().toMutableMap() tempArg["trigger"] = trigger.asTemplateArg() tempArg["dedupedAlerts"] = dedupedAlerts.map { it.asTemplateArg() } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt index e1b6675b2..ec62d0e84 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt @@ -11,12 +11,14 @@ import org.opensearch.alerting.model.TriggerAfterKey import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Trigger import org.opensearch.search.aggregations.AggregationBuilder +import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.SingleBucketAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder import org.opensearch.search.aggregations.support.AggregationPath import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder class AggregationQueryRewriter { @@ -26,10 +28,18 @@ class AggregationQueryRewriter { * for each trigger. */ fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List): SearchSourceBuilder { + return rewriteQuery(query, prevResult, triggers, false) + } + + /** + * Optionally adds support for returning sample documents for each bucket of data returned for a bucket level monitor. + */ + fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List, returnSampleDocs: Boolean = false): SearchSourceBuilder { triggers.forEach { trigger -> if (trigger is BucketLevelTrigger) { // add bucket selector pipeline aggregation for each trigger in query query.aggregation(trigger.bucketSelector) + // if this request is processing the subsequent pages of input query result, then add after key if (prevResult?.aggTriggersAfterKey?.get(trigger.id) != null) { val parentBucketPath = AggregationPath.parse(trigger.bucketSelector.parentBucketPath) @@ -48,11 +58,27 @@ class AggregationQueryRewriter { throw IllegalArgumentException("ParentBucketPath: $parentBucketPath not found in input query results") } } + if (factory is CompositeAggregationBuilder) { - // if the afterKey from previous result is null, what does it signify? - // A) result set exhausted OR B) first page ? - val afterKey = prevResult.aggTriggersAfterKey[trigger.id]!!.afterKey - factory.aggregateAfter(afterKey) + if (returnSampleDocs) { + // TODO: Returning sample documents should ideally be a toggleable option at the action level. + val sampleDocsAgg = listOf( + AggregationBuilders.topHits("low_hits") + .size(5) + .sort("_score", SortOrder.ASC), + AggregationBuilders.topHits("top_hits") + .size(5) + .sort("_score", SortOrder.DESC) + ) + sampleDocsAgg.forEach { agg -> + if (!factory.subAggregations.contains(agg)) factory.subAggregation(agg) + } + } else { + // if the afterKey from previous result is null, what does it signify? + // A) result set exhausted OR B) first page ? + val afterKey = prevResult.aggTriggersAfterKey[trigger.id]!!.afterKey + factory.aggregateAfter(afterKey) + } } else { throw IllegalStateException("AfterKeys are not expected to be present in non CompositeAggregationBuilder") } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index c912768cb..4504c22ff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -76,7 +76,9 @@ fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.Monitor * Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used * as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values. */ -fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinToString(separator = "#") +fun AggregationResultBucket.getBucketKeysHash(): String = getBucketKeysHash(this.bucketKeys) + +fun getBucketKeysHash(bucketKeys: List): String = bucketKeys.joinToString(separator = "#") fun Action.getActionExecutionPolicy(monitor: Monitor): ActionExecutionPolicy? { // When the ActionExecutionPolicy is null for an Action, the default is resolved at runtime diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 27a653d5f..21cefddf3 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -25,6 +25,7 @@ import org.opensearch.commons.alerting.model.Alert.State.ACKNOWLEDGED import org.opensearch.commons.alerting.model.Alert.State.ACTIVE import org.opensearch.commons.alerting.model.Alert.State.COMPLETED import org.opensearch.commons.alerting.model.Alert.State.ERROR +import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery @@ -1506,6 +1507,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Alerts not saved", 2, currentAlerts.size) currentAlerts.forEach { alert -> Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1) + assertFalse("Alert documents should not be an AlertContext.", alert is AlertContext) } val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 1, findings.size) @@ -1575,6 +1577,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Alerts not saved", 2, currentAlerts.size) currentAlerts.forEach { alert -> Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1) + assertFalse("Alert documents should not be an AlertContext.", alert is AlertContext) } val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 1, findings.size) @@ -1645,6 +1648,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Alerts not saved", 2, currentAlerts.size) currentAlerts.forEach { alert -> Assert.assertEquals("expected findings for alert", alert.findingIds.size, 0) + assertFalse("Alert documents should not be an AlertContext.", alert is AlertContext) } val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 0, findings.size) @@ -1941,6 +1945,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { mutableMapOf(Pair(actionThrottleEnabled.id, 0), Pair(actionThrottleNotEnabled.id, 0)) ) assertEquals(notThrottledActionResults.size, 2) + // Save the lastExecutionTimes of the actions for the Alert to be compared later against // the next Monitor execution run previousAlertExecutionTime[it.id] = mutableMapOf() @@ -1989,6 +1994,88 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { } } + fun `test bucket-level monitor notification message includes sample docs per bucket`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_1", + "test_value_2" + ) + ) + + val messageSource = "{{#ctx.newAlerts}}\n{{#sample_documents}}\n (docId={{_id}}) \n{{/sample_documents}}\n{{/ctx.newAlerts}}" + val bucket1DocIds = listOf("(docId=1)", "(docId=2)") + val bucket2DocIds = listOf("(docId=3)") + + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + val action = randomAction( + template = randomTemplateScript(source = messageSource), + destinationId = createDestination().id + ) + var trigger = randomBucketLevelTrigger(actions = listOf(action)) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val output = entityAsMap(executeMonitor(monitor.id)) + // The 'events' in this case are the bucketKeys hashes representing the Alert events + val expectedEvents = setOf("test_value_1", "test_value_2") + + assertEquals(monitor.name, output["monitor_name"]) + for (triggerResult in output.objectMap("trigger_results").values) { + for (alertEvent in triggerResult.objectMap("action_results")) { + assertTrue(expectedEvents.contains(alertEvent.key)) + val actionResults = alertEvent.value.values as Collection> + for (actionResult in actionResults) { + val actionOutput = actionResult["output"] as Map + if (actionResult["name"] == action.name) { + when (alertEvent.key) { + "test_value_1" -> bucket1DocIds.forEach { docEntry -> + assertTrue( + "The notification message is missing docEntry $docEntry", + !actionOutput["message"].isNullOrEmpty() && actionOutput["message"]!!.contains(docEntry) + ) + } + "test_value_2" -> bucket2DocIds.forEach { docEntry -> + assertTrue( + "The notification message is missing docEntry $docEntry", + !actionOutput["message"].isNullOrEmpty() && actionOutput["message"]!!.contains(docEntry) + ) + } + } + } else { + fail("Unknown action: ${actionResult["name"]}") + } + } + } + } + } + private fun prepareTestAnomalyResult(detectorId: String, user: User) { val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17" try { From 35a5264fc23cf8856b9c0f702cfcba81e31a9ff4 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 5 Mar 2024 08:33:41 -0800 Subject: [PATCH 03/16] Added support for printing query/rule info in notification messages. Signed-off-by: AWSHurneyt --- .../alerting/DocumentLevelMonitorRunner.kt | 23 +++++++- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../alerting/DocumentMonitorRunnerIT.kt | 59 +++++++++++++++++++ 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index ff939418a..234653251 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -42,6 +42,7 @@ import org.opensearch.commons.alerting.action.PublishFindingsRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -95,6 +96,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + monitorCtx.findingsToTriggeredQueries = mutableMapOf() try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) @@ -456,6 +458,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) val alerts = mutableListOf() + val alertContexts = mutableListOf() triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), @@ -466,6 +469,14 @@ class DocumentLevelMonitorRunner : MonitorRunner() { workflorwRunContext = workflowRunContext ) alerts.add(alert) + alertContexts.add( + AlertContext( + alert = alert, + associatedQueries = alert.findingIds.flatMap { findingId -> + monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList() + } + ) + ) } val shouldDefaultToPerExecution = defaultToPerExecutionAction( @@ -479,13 +490,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() { for (action in trigger.actions) { val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { - for (alert in alerts) { + for (alert in alertContexts) { val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) } - } else if (alerts.isNotEmpty()) { - val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun) + } else if (alertContexts.isNotEmpty()) { + val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun) for (alert in alerts) { triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) @@ -532,6 +543,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { val findingDocPairs = mutableListOf>() val findings = mutableListOf() val indexRequests = mutableListOf() + val findingsToTriggeredQueries = mutableMapOf>() docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } @@ -552,6 +564,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) findingDocPairs.add(Pair(finding.id, it.key)) findings.add(finding) + findingsToTriggeredQueries[finding.id] = triggeredQueries val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) @@ -578,6 +591,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() { // suppress exception logger.error("Optional finding callback failed", e) } + + if (monitorCtx.findingsToTriggeredQueries == null) monitorCtx.findingsToTriggeredQueries = findingsToTriggeredQueries + else monitorCtx.findingsToTriggeredQueries = monitorCtx.findingsToTriggeredQueries!! + findingsToTriggeredQueries + return findingDocPairs } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 2aed10a9a..f289aa390 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -18,6 +18,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService @@ -39,6 +40,7 @@ data class MonitorRunnerExecutionContext( var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, var jvmStats: JvmStats? = null, + var findingsToTriggeredQueries: Map>? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index cc65aa849..5067379cb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2426,6 +2426,65 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } } + fun `test document-level monitor notification message includes queries`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "test-query", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val alertCategories = AlertCategory.values() + val actionExecutionScope = PerAlertActionScope( + actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet() + ) + val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope) + val actions = (0..randomInt(10)).map { + randomActionWithPolicy( + template = randomTemplateScript("{{#ctx.alerts}}\n{{#associated_queries}}\n(name={{name}})\n{{/associated_queries}}\n{{/ctx.alerts}}"), + destinationId = createDestination().id, + actionExecutionPolicy = actionExecutionPolicy + ) + } + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(2, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + assertEquals(actions.size, alertActionResult.values.size) + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] as Map + assertTrue( + "The notification message is missing the query name.", + actionOutput["message"]!!.contains("(name=${docQuery.name})") + ) + } + } + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> { From 2cf566741172738cdb06b5cba52f67150661afe0 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Wed, 6 Mar 2024 16:56:41 -0800 Subject: [PATCH 04/16] Extracted out helper function. Signed-off-by: AWSHurneyt --- .../alerting/BucketLevelMonitorRunner.kt | 5 +- .../org/opensearch/alerting/InputService.kt | 2 +- .../opensearch/alerting/util/AlertingUtils.kt | 75 +++++++++++++++++++ 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 512b96a3d..947b22af4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -26,6 +26,7 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.util.getCombinedTriggerRunResult +import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType @@ -234,8 +235,10 @@ object BucketLevelMonitorRunner : MonitorRunner() { // Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts) + // Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data. + val isTriggered = !nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty() @Suppress("UNCHECKED_CAST") - if (!nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty()) { + if (isTriggered && printsSampleDocData(trigger)) { val sampleDocumentsByBucket = mutableMapOf>>() try { val searchRequest = monitorCtx.inputService!!.getSearchRequest( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 82c16af37..279c20b44 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -236,7 +236,7 @@ class InputService( } } - suspend fun getSearchRequest( + fun getSearchRequest( monitor: Monitor, searchInput: SearchInput, periodStart: Instant, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 4504c22ff..55dea88e0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -13,11 +13,14 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.commons.alerting.model.AggregationResultBucket +import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Trigger import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import org.opensearch.script.Script private val logger = LogManager.getLogger("AlertingUtils") @@ -179,3 +182,75 @@ fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) { cause.addSuppressed(closeException) } } + +/** + * Checks the `message_template.source` in the [Script] for each [Action] in the [Trigger] for + * any instances of [AlertContext.SAMPLE_DOCS_FIELD] tags. + * This indicates the message is expected to print data from the sample docs, so we need to collect the samples. + */ +fun printsSampleDocData(trigger: Trigger): Boolean { + return trigger.actions.any { action -> + // The {{ctx}} mustache tag indicates the entire ctx object should be printed in the message string. + // TODO: Consider excluding `{{ctx}}` from criteria for bucket-level triggers as printing all of + // their sample documents could make the notification message too large to send. + action.messageTemplate.idOrCode.contains("{{ctx}}") || + action.messageTemplate.idOrCode.contains(AlertContext.SAMPLE_DOCS_FIELD) + } +} + +fun printsSampleDocData(triggers: List): Boolean { + return triggers.any { trigger -> printsSampleDocData(trigger) } +} + +/** + * Mustache template supports iterating through a list using a `{{#listVariable}}{{/listVariable}}` block. + * https://mustache.github.io/mustache.5.html + * + * This function looks `{{#${[AlertContext.SAMPLE_DOCS_FIELD]}}}{{/${[AlertContext.SAMPLE_DOCS_FIELD]}}}` blocks, + * and parses the contents for tags, which we interpret as fields within the sample document. + * + * @return a [Set] of [String]s indicating fields within a document. + */ +fun parseSampleDocTags(messageTemplate: Script): Set { + val sampleBlockPrefix = "{{#${AlertContext.SAMPLE_DOCS_FIELD}}}" + val sampleBlockSuffix = "{{/${AlertContext.SAMPLE_DOCS_FIELD}}}" + val tagRegex = Regex("\\{\\{([^{}]+)}}") + val tags = mutableSetOf() + try { + // Identify the start and end points of the sample block + var sampleBlockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix) + var sampleBlockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, sampleBlockStart) + + // Sample start/end of -1 indicates there are no more complete sample blocks + while (sampleBlockStart != -1 && sampleBlockEnd != -1) { + // Isolate the sample block + val sampleBlock = messageTemplate.idOrCode.substring(sampleBlockStart, sampleBlockEnd) + // Remove the iteration wrapper tags + .removeSurrounding(sampleBlockPrefix, sampleBlockSuffix) + + // Search for each tag + tagRegex.findAll(sampleBlock).forEach { match -> + // Parse the field name from the tag (e.g., `{{_source.timestamp}}` becomes `_source.timestamp`) + val docField = match.groupValues[1].trim() + if (docField.isNotEmpty()) tags.add(docField) + } + + // Identify any subsequent sample blocks + sampleBlockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix, sampleBlockEnd) + sampleBlockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, sampleBlockStart) + } + } catch (e: Exception) { + logger.warn("Failed to parse sample document fields.", e) + } + return tags +} + +fun parseSampleDocTags(actions: List): Set { + return actions.flatMap { action -> parseSampleDocTags(action.messageTemplate) } + .toSet() +} + +fun parseSampleDocTags(triggers: List): Set { + return triggers.flatMap { trigger -> parseSampleDocTags(trigger.actions) } + .toSet() +} From 60e3091a75e9619339de95d8e28b65e1b45b3426 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Wed, 6 Mar 2024 17:09:24 -0800 Subject: [PATCH 05/16] Extracted out helper function. Signed-off-by: AWSHurneyt --- .../main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 55dea88e0..0f092290c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -194,7 +194,7 @@ fun printsSampleDocData(trigger: Trigger): Boolean { // TODO: Consider excluding `{{ctx}}` from criteria for bucket-level triggers as printing all of // their sample documents could make the notification message too large to send. action.messageTemplate.idOrCode.contains("{{ctx}}") || - action.messageTemplate.idOrCode.contains(AlertContext.SAMPLE_DOCS_FIELD) + action.messageTemplate.idOrCode.contains(AlertContext.SAMPLE_DOCS_FIELD) } } From 2d469227a32aa9edecaccd753d9524c8eaec6ce5 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 11 Mar 2024 00:14:02 -0700 Subject: [PATCH 06/16] Added support for printing document data in notification messages for document level monitors. Signed-off-by: AWSHurneyt --- .../alerting/DocumentLevelMonitorRunner.kt | 59 +++++- .../BucketLevelTriggerExecutionContext.kt | 18 +- .../DocumentLevelTriggerExecutionContext.kt | 9 +- .../alerting/util/AggregationQueryRewriter.kt | 4 + .../opensearch/alerting/util/AlertingUtils.kt | 86 +++++---- .../alerting/DocumentMonitorRunnerIT.kt | 154 +++++++++++++++ .../alerting/util/AlertingUtilsTests.kt | 179 ++++++++++++++++++ 7 files changed, 465 insertions(+), 44 deletions(-) create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 234653251..637f7096c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.refresh.RefreshAction import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.get.MultiGetItemResponse +import org.opensearch.action.get.MultiGetRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest @@ -23,12 +25,15 @@ import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.parseSampleDocTags +import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata @@ -65,6 +70,7 @@ import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant @@ -84,6 +90,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * Docs are fetched from the source index per shard and transformed.*/ val transformedDocs = mutableListOf>() + // Maps a finding ID to the concrete index name. + val findingIdToConcreteIndex = mutableMapOf() + + // Maps the docId to the doc source + val docIdToDocMap = mutableMapOf>() + override suspend fun runMonitor( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, @@ -457,6 +469,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() { error = monitorResult.error ?: triggerResult.error ) + if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty()) + getDocSources( + findingToDocPairs = findingToDocPairs, + monitorCtx = monitorCtx, + monitor = monitor + ) + val alerts = mutableListOf() val alertContexts = mutableListOf() triggerFindingDocPairs.forEach { @@ -469,12 +488,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() { workflorwRunContext = workflowRunContext ) alerts.add(alert) + + val docId = alert.relatedDocIds.first().split("|").first() + val docSource = docIdToDocMap[docId]?.find { item -> + findingIdToConcreteIndex[alert.findingIds.first()] == item.index + }?.response?.convertToMap() + alertContexts.add( AlertContext( alert = alert, associatedQueries = alert.findingIds.flatMap { findingId -> monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList() - } + }, + sampleDocs = listOfNotNull(docSource) ) ) } @@ -565,6 +591,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { findingDocPairs.add(Pair(finding.id, it.key)) findings.add(finding) findingsToTriggeredQueries[finding.id] = triggeredQueries + findingIdToConcreteIndex[finding.id] = finding.index val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) @@ -1064,6 +1091,36 @@ class DocumentLevelMonitorRunner : MonitorRunner() { return numDocs >= maxNumDocsThreshold } + /** + * Performs an mGet request to retrieve the documents associated with findings. + * + * When possible, this will only retrieve the document fields that are specifically + * referenced for printing in the mustache template. + */ + private suspend fun getDocSources( + findingToDocPairs: List>, + monitorCtx: MonitorRunnerExecutionContext, + monitor: Monitor + ) { + val docFieldTags = parseSampleDocTags(monitor.triggers) + val request = MultiGetRequest() + findingToDocPairs.forEach { (_, docIdAndIndex) -> + val docIdAndIndexSplit = docIdAndIndex.split("|") + val docId = docIdAndIndexSplit[0] + val concreteIndex = docIdAndIndexSplit[1] + if (docId.isNotEmpty() && concreteIndex.isNotEmpty()) { + val docItem = MultiGetRequest.Item(concreteIndex, docId) + if (docFieldTags.isNotEmpty()) + docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) + request.add(docItem) + } + } + val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) } + response.responses.forEach { item -> + docIdToDocMap.getOrPut(item.id) { mutableListOf() }.add(item) + } + } + /** * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name * and doc source. A list of these POJOs would be passed to percolate query execution logic. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt index b3478cfb8..d0e90fa24 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -45,11 +45,19 @@ data class BucketLevelTriggerExecutionContext( */ override fun asTemplateArg(): Map { val tempArg = super.asTemplateArg().toMutableMap() - tempArg["trigger"] = trigger.asTemplateArg() - tempArg["dedupedAlerts"] = dedupedAlerts.map { it.asTemplateArg() } - tempArg["newAlerts"] = newAlerts.map { it.asTemplateArg() } - tempArg["completedAlerts"] = completedAlerts.map { it.asTemplateArg() } - tempArg["results"] = results + tempArg[TRIGGER_FIELD] = trigger.asTemplateArg() + tempArg[DEDUPED_ALERTS_FIELD] = dedupedAlerts.map { it.asTemplateArg() } + tempArg[NEW_ALERTS_FIELD] = newAlerts.map { it.asTemplateArg() } + tempArg[COMPLETED_ALERTS_FIELD] = completedAlerts.map { it.asTemplateArg() } + tempArg[RESULTS_FIELD] = results return tempArg } + + companion object { + const val TRIGGER_FIELD = "trigger" + const val DEDUPED_ALERTS_FIELD = "dedupedAlerts" + const val NEW_ALERTS_FIELD = "newAlerts" + const val COMPLETED_ALERTS_FIELD = "completedAlerts" + const val RESULTS_FIELD = "results" + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt index 66de731f6..8035d8f58 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt @@ -37,8 +37,13 @@ data class DocumentLevelTriggerExecutionContext( */ override fun asTemplateArg(): Map { val tempArg = super.asTemplateArg().toMutableMap() - tempArg["trigger"] = trigger.asTemplateArg() - tempArg["alerts"] = alerts.map { it.asTemplateArg() } + tempArg[TRIGGER_FIELD] = trigger.asTemplateArg() + tempArg[ALERTS_FIELD] = alerts.map { it.asTemplateArg() } return tempArg } + + companion object { + const val TRIGGER_FIELD = "trigger" + const val ALERTS_FIELD = "alerts" + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt index ec62d0e84..5a211464d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt @@ -18,6 +18,7 @@ import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder import org.opensearch.search.aggregations.support.AggregationPath import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.search.sort.SortOrder class AggregationQueryRewriter { @@ -62,6 +63,8 @@ class AggregationQueryRewriter { if (factory is CompositeAggregationBuilder) { if (returnSampleDocs) { // TODO: Returning sample documents should ideally be a toggleable option at the action level. + // For now, identify which fields to return from the doc _source for the trigger's actions. + val docFieldTags = parseSampleDocTags(listOf(trigger)) val sampleDocsAgg = listOf( AggregationBuilders.topHits("low_hits") .size(5) @@ -71,6 +74,7 @@ class AggregationQueryRewriter { .sort("_score", SortOrder.DESC) ) sampleDocsAgg.forEach { agg -> + if (docFieldTags.isNotEmpty()) agg.fetchSource(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) if (!factory.subAggregations.contains(agg)) factory.subAggregation(agg) } } else { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 0f092290c..9087631b7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -8,12 +8,16 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.AlertContext +import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.Trigger import org.opensearch.commons.alerting.model.action.Action @@ -183,30 +187,11 @@ fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) { } } -/** - * Checks the `message_template.source` in the [Script] for each [Action] in the [Trigger] for - * any instances of [AlertContext.SAMPLE_DOCS_FIELD] tags. - * This indicates the message is expected to print data from the sample docs, so we need to collect the samples. - */ -fun printsSampleDocData(trigger: Trigger): Boolean { - return trigger.actions.any { action -> - // The {{ctx}} mustache tag indicates the entire ctx object should be printed in the message string. - // TODO: Consider excluding `{{ctx}}` from criteria for bucket-level triggers as printing all of - // their sample documents could make the notification message too large to send. - action.messageTemplate.idOrCode.contains("{{ctx}}") || - action.messageTemplate.idOrCode.contains(AlertContext.SAMPLE_DOCS_FIELD) - } -} - -fun printsSampleDocData(triggers: List): Boolean { - return triggers.any { trigger -> printsSampleDocData(trigger) } -} - /** * Mustache template supports iterating through a list using a `{{#listVariable}}{{/listVariable}}` block. * https://mustache.github.io/mustache.5.html * - * This function looks `{{#${[AlertContext.SAMPLE_DOCS_FIELD]}}}{{/${[AlertContext.SAMPLE_DOCS_FIELD]}}}` blocks, + * This function looks for `{{#${[AlertContext.SAMPLE_DOCS_FIELD]}}}{{/${[AlertContext.SAMPLE_DOCS_FIELD]}}}` blocks, * and parses the contents for tags, which we interpret as fields within the sample document. * * @return a [Set] of [String]s indicating fields within a document. @@ -214,30 +199,35 @@ fun printsSampleDocData(triggers: List): Boolean { fun parseSampleDocTags(messageTemplate: Script): Set { val sampleBlockPrefix = "{{#${AlertContext.SAMPLE_DOCS_FIELD}}}" val sampleBlockSuffix = "{{/${AlertContext.SAMPLE_DOCS_FIELD}}}" + val sourcePrefix = "_source." val tagRegex = Regex("\\{\\{([^{}]+)}}") val tags = mutableSetOf() try { // Identify the start and end points of the sample block - var sampleBlockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix) - var sampleBlockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, sampleBlockStart) + var blockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix) + var blockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, blockStart) // Sample start/end of -1 indicates there are no more complete sample blocks - while (sampleBlockStart != -1 && sampleBlockEnd != -1) { + while (blockStart != -1 && blockEnd != -1) { // Isolate the sample block - val sampleBlock = messageTemplate.idOrCode.substring(sampleBlockStart, sampleBlockEnd) + val sampleBlock = messageTemplate.idOrCode.substring(blockStart, blockEnd) // Remove the iteration wrapper tags - .removeSurrounding(sampleBlockPrefix, sampleBlockSuffix) + .removePrefix(sampleBlockPrefix) + .removeSuffix(sampleBlockSuffix) // Search for each tag tagRegex.findAll(sampleBlock).forEach { match -> - // Parse the field name from the tag (e.g., `{{_source.timestamp}}` becomes `_source.timestamp`) - val docField = match.groupValues[1].trim() - if (docField.isNotEmpty()) tags.add(docField) + // Parse the field name from the tag (e.g., `{{_source.timestamp}}` becomes `timestamp`) + var docField = match.groupValues[1].trim() + if (docField.startsWith(sourcePrefix)) { + docField = docField.removePrefix(sourcePrefix) + if (docField.isNotEmpty()) tags.add(docField) + } } // Identify any subsequent sample blocks - sampleBlockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix, sampleBlockEnd) - sampleBlockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, sampleBlockStart) + blockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix, blockEnd) + blockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, blockStart) } } catch (e: Exception) { logger.warn("Failed to parse sample document fields.", e) @@ -245,12 +235,36 @@ fun parseSampleDocTags(messageTemplate: Script): Set { return tags } -fun parseSampleDocTags(actions: List): Set { - return actions.flatMap { action -> parseSampleDocTags(action.messageTemplate) } - .toSet() +fun parseSampleDocTags(triggers: List): Set { + return triggers.flatMap { trigger -> + trigger.actions.flatMap { action -> parseSampleDocTags(action.messageTemplate) } + }.toSet() } -fun parseSampleDocTags(triggers: List): Set { - return triggers.flatMap { trigger -> parseSampleDocTags(trigger.actions) } - .toSet() +/** + * Checks the `message_template.source` in the [Script] for each [Action] in the [Trigger] for + * any instances of [AlertContext.SAMPLE_DOCS_FIELD] tags. + * This indicates the message is expected to print data from the sample docs, so we need to collect the samples. + */ +fun printsSampleDocData(trigger: Trigger): Boolean { + return trigger.actions.any { action -> + val alertsField = when (trigger) { + is BucketLevelTrigger -> "{{ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}}" + is DocumentLevelTrigger -> "{{ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}}" + // Only bucket, and document level monitors are supported currently. + else -> return false + } + + // TODO: Consider excluding the following tags from TRUE criteria (especially for bucket-level triggers) as + // printing all of the sample documents could make the notification message too large to send. + // 1. {{ctx}} - prints entire ctx object in the message string + // 2. {{ctx.}} - prints entire alerts array in the message string, which includes the sample docs + // 3. {{AlertContext.SAMPLE_DOCS_FIELD}} - prints entire sample docs array in the message string + val validTags = listOfNotNull( + "{{ctx}}", + alertsField, + AlertContext.SAMPLE_DOCS_FIELD + ) + validTags.any { tag -> action.messageTemplate.idOrCode.contains(tag) } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 5067379cb..15715b040 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2485,6 +2485,160 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } } + fun `test expected document and rules print in notification message`() { + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "Test message", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = DocLevelQuery(query = "\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + // Prints all fields in doc source + val scriptSource1 = """ + Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n + - Trigger: {{ctx.trigger.name}}\n + - Severity: {{ctx.trigger.severity}}\n + - Period start: {{ctx.periodStart}}\n + - Period end: {{ctx.periodEnd}}\n\n + - New Alerts:\n + {{#ctx.alerts}}\n + Document values + {{#sample_documents}}\n + Test field: {{_source.test_field}}\n + Message: {{_source.message}}\n + Timestamp: {{_source.test_strict_date_time}}\n + {{/sample_documents}}\n + \n + Matching queries\n + {{#associated_queries}}\n + Query ID: {{id}}\n + Query name: {{name}}\n + {{/associated_queries}}\n + {{/ctx.alerts}} + """.trimIndent() + + // Only prints a few fields from the doc source + val scriptSource2 = """ + Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n + - Trigger: {{ctx.trigger.name}}\n + - Severity: {{ctx.trigger.severity}}\n + - Period start: {{ctx.periodStart}}\n + - Period end: {{ctx.periodEnd}}\n\n + - New Alerts:\n + {{#ctx.alerts}}\n + Document values + {{#sample_documents}}\n + Test field: {{_source.test_field}}\n + Message: {{_source.message}}\n + {{/sample_documents}}\n + \n + Matching queries\n + {{#associated_queries}}\n + Query ID: {{id}}\n + Query name: {{name}}\n + {{/associated_queries}}\n + {{/ctx.alerts}} + """.trimIndent() + + // Doesn't print any document data + val scriptSource3 = """ + Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n + - Trigger: {{ctx.trigger.name}}\n + - Severity: {{ctx.trigger.severity}}\n + - Period start: {{ctx.periodStart}}\n + - Period end: {{ctx.periodEnd}}\n\n + - New Alerts:\n + {{#ctx.alerts}}\n + Matching queries\n + {{#associated_queries}}\n + Query ID: {{id}}\n + Query name: {{name}}\n + {{/associated_queries}}\n + {{/ctx.alerts}} + """.trimIndent() + + // Using 'alert.copy()' here because 'randomAction()' applies the 'template' for the message subject, and message body + val actions = listOf( + randomAction(name = "action1", template = randomTemplateScript("action1 message"), destinationId = createDestination().id) + .copy(messageTemplate = randomTemplateScript(scriptSource1)), + randomAction(name = "action2", template = randomTemplateScript("action2 message"), destinationId = createDestination().id) + .copy(messageTemplate = randomTemplateScript(scriptSource2)), + randomAction(name = "action3", template = randomTemplateScript("action3 message"), destinationId = createDestination().id) + .copy(messageTemplate = randomTemplateScript(scriptSource3)) + ) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)) + ) + ) + + indexDoc(index, "", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + val triggerResults = output.objectMap("trigger_results") + assertEquals(1, triggerResults.values.size) + + val expectedMessageContents = mapOf( + "action1" to Pair( + // First item in pair is INCLUDED content + listOf( + "Test field: us-west-2", + "Message: Test message", + "Timestamp: $testTime", + "Query ID: ${docQuery.id}", + "Query name: ${docQuery.name}", + ), + // Second item in pair is EXCLUDED content + listOf() + ), + "action2" to Pair( + // First item in pair is INCLUDED content + listOf( + "Test field: us-west-2", + "Message: Test message", + "Query ID: ${docQuery.id}", + "Query name: ${docQuery.name}", + ), + // Second item in pair is EXCLUDED content + listOf("Timestamp: $testTime") + ), + "action3" to Pair( + // First item in pair is INCLUDED content + listOf( + "Query ID: ${docQuery.id}", + "Query name: ${docQuery.name}", + ), + // Second item in pair is EXCLUDED content + listOf( + "Test field: us-west-2", + "Message: Test message", + "Timestamp: $testTime", + ) + ), + ) + val actionResults = triggerResults.values.first().objectMap("action_results").values.first().values + @Suppress("UNCHECKED_CAST") + actionResults.forEach { action -> + val messageContent = ((action as Map)["output"] as Map)["message"] as String + expectedMessageContents[action["name"]]!!.first.forEach { + assertTrue(messageContent.contains(it)) + } + expectedMessageContents[action["name"]]!!.second.forEach { + assertFalse(messageContent.contains(it)) + } + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt new file mode 100644 index 000000000..baad265af --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt @@ -0,0 +1,179 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.alerting.randomAction +import org.opensearch.alerting.randomBucketLevelTrigger +import org.opensearch.alerting.randomChainedAlertTrigger +import org.opensearch.alerting.randomDocumentLevelTrigger +import org.opensearch.alerting.randomQueryLevelTrigger +import org.opensearch.alerting.randomTemplateScript +import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.commons.alerting.model.AlertContext +import org.opensearch.test.OpenSearchTestCase + +class AlertingUtilsTests : OpenSearchTestCase() { + fun `test parseSampleDocTags only returns expected tags`() { + val expectedDocSourceTags = (0..3).map { "field$it" } + val unexpectedDocSourceTags = ((expectedDocSourceTags.size + 1)..(expectedDocSourceTags.size + 5)) + .map { "field$it" } + + val unexpectedTagsScriptSource = unexpectedDocSourceTags.joinToString { field -> "$field = {{$field}}" } + val expectedTagsScriptSource = unexpectedTagsScriptSource + """ + ${unexpectedDocSourceTags.joinToString("\n") { field -> "$field = {{$field}}" }} + {{#alerts}} + {{#${AlertContext.SAMPLE_DOCS_FIELD}}} + ${expectedDocSourceTags.joinToString("\n") { field -> "$field = {{_source.$field}}" }} + {{/${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/alerts}} + """.trimIndent() + + // Action that prints doc source data + val trigger1 = randomDocumentLevelTrigger( + actions = listOf(randomAction(template = randomTemplateScript(source = expectedTagsScriptSource))) + ) + + // Action that does not print doc source data + val trigger2 = randomDocumentLevelTrigger( + actions = listOf(randomAction(template = randomTemplateScript(source = unexpectedTagsScriptSource))) + ) + + // No actions + val trigger3 = randomDocumentLevelTrigger(actions = listOf()) + + val tags = parseSampleDocTags(listOf(trigger1, trigger2, trigger3)) + + assertEquals(expectedDocSourceTags.size, tags.size) + expectedDocSourceTags.forEach { tag -> assertTrue(tags.contains(tag)) } + unexpectedDocSourceTags.forEach { tag -> assertFalse(tags.contains(tag)) } + } + + fun `test printsSampleDocData entire ctx tag returns TRUE`() { + val tag = "{{ctx}}" + val triggers = listOf( + randomBucketLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))), + randomDocumentLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData entire alerts tag returns TRUE`() { + val triggers = listOf( + randomBucketLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = "{{ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}}" + ) + ) + ) + ), + randomDocumentLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = "{{ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}}" + ) + ) + ) + ) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData entire sample_docs tag returns TRUE`() { + val triggers = listOf( + randomBucketLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + {{${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ), + randomDocumentLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + {{${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData sample_docs iteration block returns TRUE`() { + val triggers = listOf( + randomBucketLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + "{{#${AlertContext.SAMPLE_DOCS_FIELD}}}" + {{_source.field}} + "{{/${AlertContext.SAMPLE_DOCS_FIELD}}}" + {{/ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ), + randomDocumentLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + {{#${AlertContext.SAMPLE_DOCS_FIELD}}} + {{_source.field}} + {{/${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData unrelated tag returns FALSE`() { + val tag = "{{ctx.monitor.name}}" + val triggers = listOf( + randomBucketLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))), + randomDocumentLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))) + ) + + triggers.forEach { trigger -> assertFalse(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData unsupported trigger types return FALSE`() { + val tag = "{{ctx}}" + val triggers = listOf( + randomQueryLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))), + randomChainedAlertTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))) + ) + + triggers.forEach { trigger -> assertFalse(printsSampleDocData(trigger)) } + } +} From 8cb01683dae351e79fe856370a3763c1161213bd Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 11 Mar 2024 15:43:14 -0700 Subject: [PATCH 07/16] Refactored logic after making AlertContext a separate class from Alert instead of inheriting/extending it in common utils. Signed-off-by: AWSHurneyt --- .../alerting/BucketLevelMonitorRunner.kt | 16 ++++++++-------- .../alerting/DocumentLevelMonitorRunner.kt | 8 ++++---- .../script/BucketLevelTriggerExecutionContext.kt | 5 +++-- .../DocumentLevelTriggerExecutionContext.kt | 6 +++--- .../alerting/MonitorRunnerServiceIT.kt | 4 ---- 5 files changed, 18 insertions(+), 21 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 947b22af4..997190ded 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -320,7 +320,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { for (alertCategory in actionExecutionScope.actionableAlerts) { val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() for (alert in alertsToExecuteActionsFor) { - val alertContext = if (alertCategory != AlertCategory.NEW) alert + val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert) else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs) val actionCtx = getActionContextForAlertCategory( @@ -550,37 +550,37 @@ object BucketLevelMonitorRunner : MonitorRunner() { private fun getActionContextForAlertCategory( alertCategory: AlertCategory, - alert: Alert, + alertContext: AlertContext, ctx: BucketLevelTriggerExecutionContext, error: Exception? ): BucketLevelTriggerExecutionContext { return when (alertCategory) { AlertCategory.DEDUPED -> - ctx.copy(dedupedAlerts = listOf(alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error) + ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error) AlertCategory.NEW -> - ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alert), completedAlerts = emptyList(), error = error) + ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error) AlertCategory.COMPLETED -> - ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alert), error = error) + ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error) } } private fun getAlertContext( alert: Alert, alertSampleDocs: Map>>> - ): Alert { + ): AlertContext { val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash() val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey) return if (!bucketKey.isNullOrEmpty() && !sampleDocs.isNullOrEmpty()) { AlertContext(alert = alert, sampleDocs = sampleDocs) } else { logger.warn( - "Error retrieving sample documents for alert {} from trigger {} of monitor {} during execution {}.", + "Failed to retrieve sample documents for alert {} from trigger {} of monitor {} during execution {}.", alert.id, alert.triggerId, alert.monitorId, alert.executionId ) - alert + AlertContext(alert = alert, sampleDocs = listOf()) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 637f7096c..db3a141a0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -516,10 +516,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() { for (action in trigger.actions) { val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { - for (alert in alertContexts) { - val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) - triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } - triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) + for (alertContext in alertContexts) { + val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alertContext)), monitorCtx, monitor, dryrun) + triggerResult.actionResultsMap.getOrPut(alertContext.alert.id) { mutableMapOf() } + triggerResult.actionResultsMap[alertContext.alert.id]?.set(action.id, actionResults) } } else if (alertContexts.isNotEmpty()) { val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt index d0e90fa24..f7e04bf8c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant @@ -22,7 +23,7 @@ data class BucketLevelTriggerExecutionContext( override val periodStart: Instant, override val periodEnd: Instant, val dedupedAlerts: List = listOf(), - val newAlerts: List = listOf(), + val newAlerts: List = listOf(), val completedAlerts: List = listOf(), override val error: Exception? = null ) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { @@ -32,7 +33,7 @@ data class BucketLevelTriggerExecutionContext( trigger: BucketLevelTrigger, monitorRunResult: MonitorRunResult, dedupedAlerts: List = listOf(), - newAlerts: List = listOf(), + newAlerts: List = listOf(), completedAlerts: List = listOf() ) : this( monitor, trigger, monitorRunResult.inputResults.results, monitorRunResult.periodStart, monitorRunResult.periodEnd, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt index 8035d8f58..35031c632 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.script -import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant @@ -16,7 +16,7 @@ data class DocumentLevelTriggerExecutionContext( override val results: List>, override val periodStart: Instant, override val periodEnd: Instant, - val alerts: List = listOf(), + val alerts: List = listOf(), val triggeredDocs: List, val relatedFindings: List, override val error: Exception? = null @@ -25,7 +25,7 @@ data class DocumentLevelTriggerExecutionContext( constructor( monitor: Monitor, trigger: DocumentLevelTrigger, - alerts: List = listOf() + alerts: List = listOf() ) : this( monitor, trigger, emptyList(), Instant.now(), Instant.now(), alerts, emptyList(), emptyList(), null diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 21cefddf3..c59bac0eb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -25,7 +25,6 @@ import org.opensearch.commons.alerting.model.Alert.State.ACKNOWLEDGED import org.opensearch.commons.alerting.model.Alert.State.ACTIVE import org.opensearch.commons.alerting.model.Alert.State.COMPLETED import org.opensearch.commons.alerting.model.Alert.State.ERROR -import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery @@ -1507,7 +1506,6 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Alerts not saved", 2, currentAlerts.size) currentAlerts.forEach { alert -> Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1) - assertFalse("Alert documents should not be an AlertContext.", alert is AlertContext) } val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 1, findings.size) @@ -1577,7 +1575,6 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Alerts not saved", 2, currentAlerts.size) currentAlerts.forEach { alert -> Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1) - assertFalse("Alert documents should not be an AlertContext.", alert is AlertContext) } val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 1, findings.size) @@ -1648,7 +1645,6 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Alerts not saved", 2, currentAlerts.size) currentAlerts.forEach { alert -> Assert.assertEquals("expected findings for alert", alert.findingIds.size, 0) - assertFalse("Alert documents should not be an AlertContext.", alert is AlertContext) } val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 0, findings.size) From 85cf9f12f1e092d20f4e8ba9bd382e577fb6da2d Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 11 Mar 2024 17:21:14 -0700 Subject: [PATCH 08/16] Moved AlertContext data model from common utils to alerting plugin. Signed-off-by: AWSHurneyt --- .../alerting/BucketLevelMonitorRunner.kt | 2 +- .../alerting/DocumentLevelMonitorRunner.kt | 2 +- .../opensearch/alerting/model/AlertContext.kt | 49 +++++++++++++++++++ .../BucketLevelTriggerExecutionContext.kt | 2 +- .../DocumentLevelTriggerExecutionContext.kt | 2 +- .../opensearch/alerting/util/AlertingUtils.kt | 2 +- .../org/opensearch/alerting/TestHelpers.kt | 20 ++++++++ .../alerting/model/AlertContextTests.kt | 39 +++++++++++++++ .../alerting/util/AlertingUtilsTests.kt | 2 +- 9 files changed, 114 insertions(+), 6 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 997190ded..3c6d985f4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -13,6 +13,7 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorRunResult @@ -31,7 +32,6 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert -import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index db3a141a0..d32f4789e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -19,6 +19,7 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults @@ -47,7 +48,6 @@ import org.opensearch.commons.alerting.action.PublishFindingsRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.Alert -import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt new file mode 100644 index 000000000..d94d2af57 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelQuery + +/** + * This model is a wrapper for [Alert] that should only be used to create a more + * informative alert object to enrich mustache template notification messages. + */ +data class AlertContext( + val alert: Alert, + val associatedQueries: List? = null, + val sampleDocs: List>? = null +) { + fun asTemplateArg(): Map { + val queriesContext = associatedQueries?.map { + mapOf( + DocLevelQuery.QUERY_ID_FIELD to it.id, + DocLevelQuery.NAME_FIELD to it.name, + DocLevelQuery.TAGS_FIELD to it.tags + ) + } + + // Compile the custom context fields. + val customContextFields = mapOf( + ASSOCIATED_QUERIES_FIELD to queriesContext, + SAMPLE_DOCS_FIELD to sampleDocs + ) + + // Get the alert template args + val templateArgs = alert.asTemplateArg().toMutableMap() + + // Add the non-null custom context fields to the alert templateArgs. + customContextFields.forEach { (key, value) -> + value?.let { templateArgs[key] = it } + } + return templateArgs + } + + companion object { + const val ASSOCIATED_QUERIES_FIELD = "associated_queries" + const val SAMPLE_DOCS_FIELD = "sample_documents" + } +} \ No newline at end of file diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt index f7e04bf8c..597ff5b3e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -6,10 +6,10 @@ package org.opensearch.alerting.script import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.Alert -import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt index 35031c632..543e6bdf7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.script -import org.opensearch.commons.alerting.model.AlertContext +import org.opensearch.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 9087631b7..ba1ad261d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.destination.Destination import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext @@ -15,7 +16,6 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.commons.alerting.model.AggregationResultBucket -import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 143a77afd..e99c9635e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -9,6 +9,7 @@ import junit.framework.TestCase.assertNull import org.apache.hc.core5.http.Header import org.apache.hc.core5.http.HttpEntity import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults @@ -782,3 +783,22 @@ fun randomChainedAlertTrigger( } else actions ) } + +fun randomAlertContext( + alert: Alert = randomAlert(), + associatedQueries: List? = (-1..2).random().takeIf { it != -1 }?.let { + (0..it).map { randomDocLevelQuery() } + }, + sampleDocs: List>? = (-1..2).random().takeIf { it != -1 }?.let { + (0..it).map { + // Using 'randomFinding' to mimic documents in an index. + randomFinding().asTemplateArg() + } + } +): AlertContext { + return AlertContext( + alert = alert, + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt new file mode 100644 index 000000000..97efad9e1 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.randomAlertContext +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.test.OpenSearchTestCase + +class AlertContextTests : OpenSearchTestCase() { + + fun `test AlertContext asTemplateArg`() { + val alertContext: AlertContext = randomAlertContext() + val templateArgs = alertContext.asTemplateArg() + + assertEquals("Template args id does not match", templateArgs[Alert.ALERT_ID_FIELD], alertContext.alert.id) + assertEquals("Template args version does not match", templateArgs[Alert.ALERT_VERSION_FIELD], alertContext.alert.version) + assertEquals("Template args state does not match", templateArgs[Alert.STATE_FIELD], alertContext.alert.state.toString()) + assertEquals("Template args error message does not match", templateArgs[Alert.ERROR_MESSAGE_FIELD], alertContext.alert.errorMessage) + assertEquals("Template args acknowledged time does not match", templateArgs[Alert.ACKNOWLEDGED_TIME_FIELD], null) + assertEquals("Template args end time does not", templateArgs[Alert.END_TIME_FIELD], alertContext.alert.endTime?.toEpochMilli()) + assertEquals("Template args start time does not", templateArgs[Alert.START_TIME_FIELD], alertContext.alert.startTime.toEpochMilli()) + assertEquals("Template args last notification time does not match", templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null) + assertEquals("Template args severity does not match", templateArgs[Alert.SEVERITY_FIELD], alertContext.alert.severity) + assertEquals("Template args clusters does not match", templateArgs[Alert.CLUSTERS_FIELD], alertContext.alert.clusters?.joinToString(",")) + val formattedQueries = alertContext.associatedQueries?.map { + mapOf( + DocLevelQuery.QUERY_ID_FIELD to it.id, + DocLevelQuery.NAME_FIELD to it.name, + DocLevelQuery.TAGS_FIELD to it.tags + ) + } + assertEquals("Template associated queries do not match", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD], formattedQueries) + assertEquals("Template args sample docs do not match", templateArgs[AlertContext.SAMPLE_DOCS_FIELD], alertContext.sampleDocs) + } +} \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt index baad265af..31dcb6591 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt @@ -5,6 +5,7 @@ package org.opensearch.alerting.util +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomBucketLevelTrigger import org.opensearch.alerting.randomChainedAlertTrigger @@ -13,7 +14,6 @@ import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomTemplateScript import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext -import org.opensearch.commons.alerting.model.AlertContext import org.opensearch.test.OpenSearchTestCase class AlertingUtilsTests : OpenSearchTestCase() { From e54acac349cb24c2ff27edfd176287e0a25d295d Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 11 Mar 2024 17:28:50 -0700 Subject: [PATCH 09/16] Fixed ktlint errors. Signed-off-by: AWSHurneyt --- .../main/kotlin/org/opensearch/alerting/model/AlertContext.kt | 2 +- .../kotlin/org/opensearch/alerting/model/AlertContextTests.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt index d94d2af57..f981691c8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt @@ -46,4 +46,4 @@ data class AlertContext( const val ASSOCIATED_QUERIES_FIELD = "associated_queries" const val SAMPLE_DOCS_FIELD = "sample_documents" } -} \ No newline at end of file +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt index 97efad9e1..fbd768a01 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt @@ -36,4 +36,4 @@ class AlertContextTests : OpenSearchTestCase() { assertEquals("Template associated queries do not match", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD], formattedQueries) assertEquals("Template args sample docs do not match", templateArgs[AlertContext.SAMPLE_DOCS_FIELD], alertContext.sampleDocs) } -} \ No newline at end of file +} From f3bea23ea76010d68ca6a4074dd78b52d1a97e9b Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Mon, 11 Mar 2024 18:11:38 -0700 Subject: [PATCH 10/16] Added additional unit tests. Signed-off-by: AWSHurneyt --- .../alerting/model/AlertContextTests.kt | 346 +++++++++++++++++- 1 file changed, 332 insertions(+), 14 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt index fbd768a01..5555f25a1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt @@ -6,34 +6,352 @@ package org.opensearch.alerting.model import org.opensearch.alerting.randomAlertContext +import org.opensearch.alerting.randomDocLevelQuery +import org.opensearch.alerting.randomFinding import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.test.OpenSearchTestCase +@Suppress("UNCHECKED_CAST") class AlertContextTests : OpenSearchTestCase() { - fun `test AlertContext asTemplateArg`() { - val alertContext: AlertContext = randomAlertContext() + fun `test AlertContext asTemplateArg with null associatedQueries and null sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with null associatedQueries and 0 sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template args sample docs should have size ${sampleDocs!!.size}", + sampleDocs!!.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with null associatedQueries and 1 sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List> = listOf(randomFinding().asTemplateArg()) + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with null associatedQueries and multiple sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List> = (0..2).map { randomFinding().asTemplateArg() } + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + val templateArgs = alertContext.asTemplateArg() - assertEquals("Template args id does not match", templateArgs[Alert.ALERT_ID_FIELD], alertContext.alert.id) - assertEquals("Template args version does not match", templateArgs[Alert.ALERT_VERSION_FIELD], alertContext.alert.version) - assertEquals("Template args state does not match", templateArgs[Alert.STATE_FIELD], alertContext.alert.state.toString()) - assertEquals("Template args error message does not match", templateArgs[Alert.ERROR_MESSAGE_FIELD], alertContext.alert.errorMessage) - assertEquals("Template args acknowledged time does not match", templateArgs[Alert.ACKNOWLEDGED_TIME_FIELD], null) - assertEquals("Template args end time does not", templateArgs[Alert.END_TIME_FIELD], alertContext.alert.endTime?.toEpochMilli()) - assertEquals("Template args start time does not", templateArgs[Alert.START_TIME_FIELD], alertContext.alert.startTime.toEpochMilli()) + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and null sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 1 associatedQueries and null sampleDocs`() { + val associatedQueries: List = listOf(randomDocLevelQuery()) + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with multiple associatedQueries and null sampleDocs`() { + val associatedQueries: List = (0..2).map { randomDocLevelQuery() } + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and 0 sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and 1 sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List> = listOf(randomFinding().asTemplateArg()) + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and multiple sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List> = (0..2).map { randomFinding().asTemplateArg() } + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 1 associatedQueries and 0 sampleDocs`() { + val associatedQueries: List = listOf(randomDocLevelQuery()) + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with multiple associatedQueries and 0 sampleDocs`() { + val associatedQueries: List = (0..2).map { randomDocLevelQuery() } + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 1 associatedQueries and 1 sampleDocs`() { + val associatedQueries: List = listOf(randomDocLevelQuery()) + val sampleDocs: List> = listOf(randomFinding().asTemplateArg()) + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with multiple associatedQueries and multiple sampleDocs`() { + val associatedQueries: List = (0..2).map { randomDocLevelQuery() } + val sampleDocs: List> = (0..2).map { randomFinding().asTemplateArg() } + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + private fun assertAlertIsEqual(alertContext: AlertContext, templateArgs: Map) { + assertEquals("Template args id does not match", alertContext.alert.id, templateArgs[Alert.ALERT_ID_FIELD]) + assertEquals("Template args version does not match", alertContext.alert.version, templateArgs[Alert.ALERT_VERSION_FIELD]) + assertEquals("Template args state does not match", alertContext.alert.state.toString(), templateArgs[Alert.STATE_FIELD]) + assertEquals("Template args error message does not match", alertContext.alert.errorMessage, templateArgs[Alert.ERROR_MESSAGE_FIELD]) + assertEquals("Template args acknowledged time does not match", null, templateArgs[Alert.ACKNOWLEDGED_TIME_FIELD]) + assertEquals("Template args end time does not", alertContext.alert.endTime?.toEpochMilli(), templateArgs[Alert.END_TIME_FIELD]) + assertEquals("Template args start time does not", alertContext.alert.startTime.toEpochMilli(), templateArgs[Alert.START_TIME_FIELD]) assertEquals("Template args last notification time does not match", templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null) - assertEquals("Template args severity does not match", templateArgs[Alert.SEVERITY_FIELD], alertContext.alert.severity) - assertEquals("Template args clusters does not match", templateArgs[Alert.CLUSTERS_FIELD], alertContext.alert.clusters?.joinToString(",")) - val formattedQueries = alertContext.associatedQueries?.map { + assertEquals("Template args severity does not match", alertContext.alert.severity, templateArgs[Alert.SEVERITY_FIELD]) + assertEquals("Template args clusters does not match", alertContext.alert.clusters?.joinToString(","), templateArgs[Alert.CLUSTERS_FIELD]) + } + + private fun formatAssociatedQueries(alertContext: AlertContext): List>? { + return alertContext.associatedQueries?.map { mapOf( DocLevelQuery.QUERY_ID_FIELD to it.id, DocLevelQuery.NAME_FIELD to it.name, DocLevelQuery.TAGS_FIELD to it.tags ) } - assertEquals("Template associated queries do not match", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD], formattedQueries) - assertEquals("Template args sample docs do not match", templateArgs[AlertContext.SAMPLE_DOCS_FIELD], alertContext.sampleDocs) } } From a1dc077e50bce881693c1afb8eaed4a503a1a980 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 12 Mar 2024 16:55:34 -0700 Subject: [PATCH 11/16] Extracted sample doc aggs logic into helper function. Added support for sorting sample docs based on metric aggregations. Signed-off-by: AWSHurneyt --- .../alerting/util/AggregationQueryRewriter.kt | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt index 5a211464d..0486be8a9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting.util import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.TriggerAfterKey +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Trigger import org.opensearch.search.aggregations.AggregationBuilder @@ -16,6 +17,7 @@ import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.SingleBucketAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder import org.opensearch.search.aggregations.support.AggregationPath import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.fetch.subphase.FetchSourceContext @@ -65,14 +67,7 @@ class AggregationQueryRewriter { // TODO: Returning sample documents should ideally be a toggleable option at the action level. // For now, identify which fields to return from the doc _source for the trigger's actions. val docFieldTags = parseSampleDocTags(listOf(trigger)) - val sampleDocsAgg = listOf( - AggregationBuilders.topHits("low_hits") - .size(5) - .sort("_score", SortOrder.ASC), - AggregationBuilders.topHits("top_hits") - .size(5) - .sort("_score", SortOrder.DESC) - ) + val sampleDocsAgg = getSampleDocAggs(factory) sampleDocsAgg.forEach { agg -> if (docFieldTags.isNotEmpty()) agg.fetchSource(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) if (!factory.subAggregations.contains(agg)) factory.subAggregation(agg) @@ -140,5 +135,25 @@ class AggregationQueryRewriter { } return bucketLevelTriggerAfterKeys } + + @Suppress("UNCHECKED_CAST") + private fun getSampleDocAggs(factory: CompositeAggregationBuilder): List { + var defaultSortFields = listOf("_score") + val aggregations = factory.subAggregations.flatMap { + (it.convertToMap()[it.name] as Map).values.flatMap { field -> + field as Map + field.values + } + } + if (aggregations.isNotEmpty()) defaultSortFields = aggregations + + val lowHitsAgg = AggregationBuilders.topHits("low_hits").size(5) + val topHitsAgg = AggregationBuilders.topHits("top_hits").size(5) + defaultSortFields.forEach { + lowHitsAgg.sort(it, SortOrder.ASC) + topHitsAgg.sort(it, SortOrder.DESC) + } + return listOf(lowHitsAgg, topHitsAgg) + } } } From b3c4f4b901380eb63a1e08bf8a289be40e86b683 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 12 Mar 2024 16:57:08 -0700 Subject: [PATCH 12/16] Extracted get sample doc logic into helper function. Added sorting for sample docs. Signed-off-by: AWSHurneyt --- .../alerting/BucketLevelMonitorRunner.kt | 100 +++++++++++------- 1 file changed, 59 insertions(+), 41 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 3c6d985f4..3b9e1e355 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -29,6 +29,7 @@ import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.util.getCombinedTriggerRunResult import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext +import org.opensearch.client.Client import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert @@ -237,9 +238,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { // Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data. val isTriggered = !nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty() - @Suppress("UNCHECKED_CAST") if (isTriggered && printsSampleDocData(trigger)) { - val sampleDocumentsByBucket = mutableMapOf>>() try { val searchRequest = monitorCtx.inputService!!.getSearchRequest( monitor = monitor.copy(triggers = listOf(trigger)), @@ -250,45 +249,13 @@ object BucketLevelMonitorRunner : MonitorRunner() { matchingDocIdsPerIndex = null, returnSampleDocs = true ) - - val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(searchRequest, it) } - val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf()) as Map - val compositeAgg = aggs.getOrDefault("composite_agg", mapOf()) as Map - val buckets = compositeAgg.getOrDefault("buckets", emptyList>()) as List> - - buckets.forEach { bucket -> - val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf()) as Map).values.toList()) - if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.") - - val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf()) as Map) - .getOrDefault("hits", mapOf()) as Map - val topHits = unwrappedTopHits.getOrDefault("hits", listOf>()) as List> - - val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf()) as Map) - .getOrDefault("hits", mapOf()) as Map - val lowHits = unwrappedLowHits.getOrDefault("hits", listOf>()) as List> - - val allHits = topHits + lowHits - - if (allHits.isEmpty()) { - // We expect sample documents to be available for each bucket. - logger.error("Sample documents not found for trigger {} of monitor {}.", trigger.id, monitor.id) - } - - // Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each. - // The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data. - val uniqueHitIds = mutableSetOf() - val dedupedHits = mutableListOf>() - allHits.forEach { hit -> - val hitId = hit["_id"] as String - if (!uniqueHitIds.contains(hitId)) { - uniqueHitIds.add(hitId) - dedupedHits.add(hit) - } - } - sampleDocumentsByBucket[bucketKey] = dedupedHits - } - + monitorCtx.client + val sampleDocumentsByBucket = getSampleDocs( + client = monitorCtx.client!!, + monitorId = monitor.id, + triggerId = trigger.id, + searchRequest = searchRequest + ) alertSampleDocs[trigger.id] = sampleDocumentsByBucket } catch (e: Exception) { logger.error("Error retrieving sample documents for trigger {} of monitor {}.", trigger.id, monitor.id, e) @@ -583,4 +550,55 @@ object BucketLevelMonitorRunner : MonitorRunner() { AlertContext(alert = alert, sampleDocs = listOf()) } } + + @Suppress("UNCHECKED_CAST") + private suspend fun getSampleDocs( + client: Client, + monitorId: String, + triggerId: String, + searchRequest: SearchRequest + ): Map>> { + val sampleDocumentsByBucket = mutableMapOf>>() + + val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf()) as Map + val compositeAgg = aggs.getOrDefault("composite_agg", mapOf()) as Map + val buckets = compositeAgg.getOrDefault("buckets", emptyList>()) as List> + + buckets.forEach { bucket -> + val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf()) as Map).values.toList()) + if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.") + + val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf()) as Map) + .getOrDefault("hits", mapOf()) as Map + val topHits = unwrappedTopHits.getOrDefault("hits", listOf>()) as List> + + val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf()) as Map) + .getOrDefault("hits", mapOf()) as Map + val lowHits = unwrappedLowHits.getOrDefault("hits", listOf>()) as List> + + // Reversing the order of lowHits so allHits will be in descending order. + val allHits = topHits + lowHits.reversed() + + if (allHits.isEmpty()) { + // We expect sample documents to be available for each bucket. + logger.error("Sample documents not found for trigger {} of monitor {}.", triggerId, monitorId) + } + + // Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each. + // The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data. + val uniqueHitIds = mutableSetOf() + val dedupedHits = mutableListOf>() + allHits.forEach { hit -> + val hitId = hit["_id"] as String + if (!uniqueHitIds.contains(hitId)) { + uniqueHitIds.add(hitId) + dedupedHits.add(hit) + } + } + sampleDocumentsByBucket[bucketKey] = dedupedHits + } + + return sampleDocumentsByBucket + } } From fddb1163c0fa46cfdbfb8fc3e3fe7bc67bb3c771 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Wed, 13 Mar 2024 13:37:56 -0700 Subject: [PATCH 13/16] Removed dev code. Signed-off-by: AWSHurneyt --- .../kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 3b9e1e355..193c3db06 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -249,7 +249,6 @@ object BucketLevelMonitorRunner : MonitorRunner() { matchingDocIdsPerIndex = null, returnSampleDocs = true ) - monitorCtx.client val sampleDocumentsByBucket = getSampleDocs( client = monitorCtx.client!!, monitorId = monitor.id, From 4bd853c26ccd6da046c7e41cc4e20bc7f7ab72e9 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Wed, 13 Mar 2024 13:40:47 -0700 Subject: [PATCH 14/16] Fixed ktlint errors. Signed-off-by: AWSHurneyt --- .../org/opensearch/alerting/InputService.kt | 7 +- .../alerting/util/AggregationQueryRewriter.kt | 15 ++++- .../alerting/DocumentMonitorRunnerIT.kt | 7 +- .../alerting/model/AlertContextTests.kt | 66 +++++++++++++++---- 4 files changed, 79 insertions(+), 16 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 279c20b44..5f4941229 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -253,7 +253,12 @@ class InputService( // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly // which causes a strange bug where the rewritten query persists on the Monitor across executions - val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(searchInput.query), prevResult, monitor.triggers, returnSampleDocs) + val rewrittenQuery = AggregationQueryRewriter.rewriteQuery( + deepCopyQuery(searchInput.query), + prevResult, + monitor.triggers, + returnSampleDocs + ) // Rewrite query to consider the doc ids per given index if (chainedFindingExist(matchingDocIdsPerIndex) && rewrittenQuery.query() != null) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt index 0486be8a9..3989bd384 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt @@ -37,7 +37,12 @@ class AggregationQueryRewriter { /** * Optionally adds support for returning sample documents for each bucket of data returned for a bucket level monitor. */ - fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List, returnSampleDocs: Boolean = false): SearchSourceBuilder { + fun rewriteQuery( + query: SearchSourceBuilder, + prevResult: InputRunResults?, + triggers: List, + returnSampleDocs: Boolean = false + ): SearchSourceBuilder { triggers.forEach { trigger -> if (trigger is BucketLevelTrigger) { // add bucket selector pipeline aggregation for each trigger in query @@ -69,7 +74,13 @@ class AggregationQueryRewriter { val docFieldTags = parseSampleDocTags(listOf(trigger)) val sampleDocsAgg = getSampleDocAggs(factory) sampleDocsAgg.forEach { agg -> - if (docFieldTags.isNotEmpty()) agg.fetchSource(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) + if (docFieldTags.isNotEmpty()) agg.fetchSource( + FetchSourceContext( + true, + docFieldTags.toTypedArray(), + emptyArray() + ) + ) if (!factory.subAggregations.contains(agg)) factory.subAggregation(agg) } } else { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index c320b90a9..83f5d5b76 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2447,7 +2447,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope) val actions = (0..randomInt(10)).map { randomActionWithPolicy( - template = randomTemplateScript("{{#ctx.alerts}}\n{{#associated_queries}}\n(name={{name}})\n{{/associated_queries}}\n{{/ctx.alerts}}"), + template = randomTemplateScript( + "{{#ctx.alerts}}\n{{#associated_queries}}\n(name={{name}})\n{{/associated_queries}}\n{{/ctx.alerts}}" + ), destinationId = createDestination().id, actionExecutionPolicy = actionExecutionPolicy ) @@ -2477,7 +2479,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { for (alertActionResult in triggerResult.objectMap("action_results").values) { assertEquals(actions.size, alertActionResult.values.size) for (actionResult in alertActionResult.values) { - @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] as Map + @Suppress("UNCHECKED_CAST") + val actionOutput = (actionResult as Map>)["output"] as Map assertTrue( "The notification message is missing the query name.", actionOutput["message"]!!.contains("(name=${docQuery.name})") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt index 5555f25a1..0f002ba22 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt @@ -106,7 +106,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) } @@ -126,7 +130,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) } @@ -146,7 +154,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) } @@ -166,7 +178,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertEquals( "Template args sample docs should have size ${sampleDocs.size}", @@ -192,7 +208,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertEquals( "Template args sample docs should have size ${sampleDocs.size}", @@ -218,7 +238,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertEquals( "Template args sample docs should have size ${sampleDocs.size}", @@ -244,7 +268,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertEquals( "Template args sample docs should have size ${sampleDocs.size}", @@ -270,7 +298,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertEquals( "Template args sample docs should have size ${sampleDocs.size}", @@ -296,7 +328,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertEquals( "Template args sample docs should have size ${sampleDocs.size}", @@ -322,7 +358,11 @@ class AlertContextTests : OpenSearchTestCase() { associatedQueries.size, (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size ) - assertEquals("Template associated queries do not match", formatAssociatedQueries(alertContext), templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) assertEquals( "Template args sample docs should have size ${sampleDocs.size}", @@ -342,7 +382,11 @@ class AlertContextTests : OpenSearchTestCase() { assertEquals("Template args start time does not", alertContext.alert.startTime.toEpochMilli(), templateArgs[Alert.START_TIME_FIELD]) assertEquals("Template args last notification time does not match", templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null) assertEquals("Template args severity does not match", alertContext.alert.severity, templateArgs[Alert.SEVERITY_FIELD]) - assertEquals("Template args clusters does not match", alertContext.alert.clusters?.joinToString(","), templateArgs[Alert.CLUSTERS_FIELD]) + assertEquals( + "Template args clusters does not match", + alertContext.alert.clusters?.joinToString(","), + templateArgs[Alert.CLUSTERS_FIELD] + ) } private fun formatAssociatedQueries(alertContext: AlertContext): List>? { From 235056c334f7988783aa5eee8efcd00fdf86d7da Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Wed, 13 Mar 2024 23:29:10 -0700 Subject: [PATCH 15/16] Added comments based on PR feedback. Signed-off-by: AWSHurneyt --- .../opensearch/alerting/BucketLevelMonitorRunner.kt | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 193c3db06..4ef74127d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -224,6 +224,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { ?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap)) } + // The alertSampleDocs map structure is Map>> val alertSampleDocs = mutableMapOf>>>() for (trigger in monitor.triggers) { val alertsToUpdate = mutableSetOf() @@ -539,7 +540,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { return if (!bucketKey.isNullOrEmpty() && !sampleDocs.isNullOrEmpty()) { AlertContext(alert = alert, sampleDocs = sampleDocs) } else { - logger.warn( + logger.error( "Failed to retrieve sample documents for alert {} from trigger {} of monitor {} during execution {}.", alert.id, alert.triggerId, @@ -550,6 +551,12 @@ object BucketLevelMonitorRunner : MonitorRunner() { } } + /** + * Executes the monitor's query with the addition of 2 top_hits aggregations that are used to return the top 5, + * and bottom 5 documents for each bucket. + * + * @return Map> + */ @Suppress("UNCHECKED_CAST") private suspend fun getSampleDocs( client: Client, @@ -558,7 +565,6 @@ object BucketLevelMonitorRunner : MonitorRunner() { searchRequest: SearchRequest ): Map>> { val sampleDocumentsByBucket = mutableMapOf>>() - val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf()) as Map val compositeAgg = aggs.getOrDefault("composite_agg", mapOf()) as Map From e61e602ed826e2a8af0ff1609b4dc04073d1f7ba Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Wed, 13 Mar 2024 23:30:24 -0700 Subject: [PATCH 16/16] Added logic to make mGet calls in batches. Signed-off-by: AWSHurneyt --- .../alerting/DocumentLevelMonitorRunner.kt | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index d32f4789e..7262b9260 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -90,11 +90,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * Docs are fetched from the source index per shard and transformed.*/ val transformedDocs = mutableListOf>() - // Maps a finding ID to the concrete index name. - val findingIdToConcreteIndex = mutableMapOf() - - // Maps the docId to the doc source - val docIdToDocMap = mutableMapOf>() + // Maps a finding ID to the related document. + private val findingIdToDocSource = mutableMapOf() override suspend fun runMonitor( monitor: Monitor, @@ -489,10 +486,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) alerts.add(alert) - val docId = alert.relatedDocIds.first().split("|").first() - val docSource = docIdToDocMap[docId]?.find { item -> - findingIdToConcreteIndex[alert.findingIds.first()] == item.index - }?.response?.convertToMap() + val docSource = findingIdToDocSource[alert.findingIds.first()]?.response?.convertToMap() alertContexts.add( AlertContext( @@ -591,7 +585,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { findingDocPairs.add(Pair(finding.id, it.key)) findings.add(finding) findingsToTriggeredQueries[finding.id] = triggeredQueries - findingIdToConcreteIndex[finding.id] = finding.index val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) @@ -1104,21 +1097,25 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) { val docFieldTags = parseSampleDocTags(monitor.triggers) val request = MultiGetRequest() - findingToDocPairs.forEach { (_, docIdAndIndex) -> - val docIdAndIndexSplit = docIdAndIndex.split("|") - val docId = docIdAndIndexSplit[0] - val concreteIndex = docIdAndIndexSplit[1] - if (docId.isNotEmpty() && concreteIndex.isNotEmpty()) { - val docItem = MultiGetRequest.Item(concreteIndex, docId) - if (docFieldTags.isNotEmpty()) - docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) - request.add(docItem) + + // Perform mGet request in batches. + findingToDocPairs.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> + batch.forEach { (findingId, docIdAndIndex) -> + val docIdAndIndexSplit = docIdAndIndex.split("|") + val docId = docIdAndIndexSplit[0] + val concreteIndex = docIdAndIndexSplit[1] + if (findingId.isNotEmpty() && docId.isNotEmpty() && concreteIndex.isNotEmpty()) { + val docItem = MultiGetRequest.Item(concreteIndex, docId) + if (docFieldTags.isNotEmpty()) + docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) + request.add(docItem) + } + val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) } + response.responses.forEach { item -> + findingIdToDocSource[findingId] = item + } } } - val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) } - response.responses.forEach { item -> - docIdToDocMap.getOrPut(item.id) { mutableListOf() }.add(item) - } } /**