From 3755993e1e224054ce129f14837e8f8fbea242c3 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 11 Dec 2024 11:13:08 -0800 Subject: [PATCH] optimize execution of workflow consisting of bucket-level followed by doc-level monitors (#1729) Signed-off-by: Subhobrata Dey --- .../alerting/BucketLevelMonitorRunner.kt | 3 +- .../alerting/DocumentLevelMonitorRunner.kt | 6 + .../opensearch/alerting/WorkflowService.kt | 9 +- .../TransportDocLevelMonitorFanOutAction.kt | 130 +++++++++++++++--- .../workflow/CompositeWorkflowRunner.kt | 9 +- .../alerting/MonitorDataSourcesIT.kt | 116 ++++++++++++++++ .../org/opensearch/alerting/TestHelpers.kt | 4 +- .../SampleRemoteMonitorRestHandler.java | 3 + 8 files changed, 248 insertions(+), 32 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 99bedd46f..cc9e6508c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder import org.opensearch.transport.TransportService import java.time.Instant import java.util.UUID @@ -479,7 +480,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { val queryBuilder = if (input.query.query() == null) BoolQueryBuilder() else QueryBuilders.boolQuery().must(source.query()) queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues)) - sr.source().query(queryBuilder) + sr.source().query(queryBuilder).sort("_seq_no", SortOrder.DESC) } sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes( getCancelAfterTimeInterval() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b5b2c4f49..16272fd99 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -139,6 +139,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + val findingIdsForMatchingDocIds = if (workflowRunContext?.findingIds != null) { + workflowRunContext.findingIds + } else { + listOf() + } val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() @@ -226,6 +231,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), + findingIdsForMatchingDocIds ) val shards = mutableSetOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt index 32d7971f1..1379a1fe3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt @@ -41,15 +41,16 @@ class WorkflowService( * @param chainedMonitors Monitors that have previously executed * @param workflowExecutionId Execution id of the current workflow */ - suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List, workflowExecutionId: String): Map> { + suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List, workflowExecutionId: String): + Pair>, List> { if (chainedMonitors.isEmpty()) - return emptyMap() + return Pair(emptyMap(), listOf()) val dataSources = chainedMonitors[0].dataSources try { val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil { exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it) } - if (existsResponse.isExists == false) return emptyMap() + if (existsResponse.isExists == false) return Pair(emptyMap(), listOf()) // Search findings index to match id of monitors and workflow execution id val bqb = QueryBuilders.boolQuery() .filter( @@ -83,7 +84,7 @@ class WorkflowService( for (finding in findings) { indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds) } - return indexToRelatedDocIdsMap + return Pair(indexToRelatedDocIdsMap, findings.map { it.id }) } catch (t: Exception) { log.error("Error getting finding doc ids: ${t.message}", t) throw AlertingException.wrap(t) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 60fe2b684..7e6f2ed1d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -297,19 +297,37 @@ class TransportDocLevelMonitorFanOutAction createFindings(monitor, docsToQueries, idQueryMap, true) } } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - dryrun, - executionId = executionId, - findingIdToDocSource, - workflowRunContext = workflowRunContext - ) + /** + * if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger + * generates a single alert with multiple findings. + */ + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + dryrun, + executionId = executionId, + findingIdToDocSource, + workflowRunContext = workflowRunContext + ) + } + } else if (monitor.shouldCreateSingleAlertForFindings == true) { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTriggerCreateSingleGroupedAlert( + monitorResult, + it as DocumentLevelTrigger, + monitor, + queryToDocIds, + dryrun, + executionId, + workflowRunContext + ) + } } } @@ -349,6 +367,58 @@ class TransportDocLevelMonitorFanOutAction } } + /** + * run doc-level triggers ignoring findings and alerts and generating a single alert. + */ + private suspend fun runForEachDocTriggerCreateSingleGroupedAlert( + monitorResult: MonitorRunResult, + trigger: DocumentLevelTrigger, + monitor: Monitor, + queryToDocIds: Map>, + dryrun: Boolean, + executionId: String, + workflowRunContext: WorkflowRunContext? + ): DocumentLevelTriggerRunResult { + val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds) + if (triggerResult.triggeredDocs.isNotEmpty()) { + val findingIds = if (workflowRunContext?.findingIds != null) { + workflowRunContext.findingIds + } else { + listOf() + } + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) + val alert = alertService.composeDocLevelAlert( + findingIds!!, + triggerResult.triggeredDocs, + triggerCtx, + monitorResult.alertError() ?: triggerResult.alertError(), + executionId = executionId, + workflorwRunContext = workflowRunContext + ) + for (action in trigger.actions) { + this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun) + } + + if (!dryrun && monitor.id != Monitor.NO_ID) { + val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap()) + val actionExecutionResults = actionResults.values.map { actionRunResult -> + ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0) + } + val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults) + + retryPolicy.let { + alertService.saveAlerts( + monitor.dataSources, + listOf(updatedAlert), + it, + routingId = monitor.id + ) + } + } + } + return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error) + } + private suspend fun runForEachDocTrigger( monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, @@ -512,7 +582,11 @@ class TransportDocLevelMonitorFanOutAction .string() log.debug("Findings: $findingStr") - if (shouldCreateFinding) { + if (shouldCreateFinding and ( + monitor.shouldCreateSingleAlertForFindings == null || + monitor.shouldCreateSingleAlertForFindings == false + ) + ) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -524,13 +598,15 @@ class TransportDocLevelMonitorFanOutAction bulkIndexFindings(monitor, indexRequests) } - try { - findings.forEach { finding -> - publishFinding(monitor, finding) + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { + try { + findings.forEach { finding -> + publishFinding(monitor, finding) + } + } catch (e: Exception) { + // suppress exception + log.error("Optional finding callback failed", e) } - } catch (e: Exception) { - // suppress exception - log.error("Optional finding callback failed", e) } this.findingsToTriggeredQueries += findingsToTriggeredQueries @@ -688,6 +764,7 @@ class TransportDocLevelMonitorFanOutAction var to: Long = Long.MAX_VALUE while (to >= from) { val hits: SearchHits = searchShard( + monitor, indexExecutionCtx.concreteIndexName, shard, from, @@ -870,6 +947,7 @@ class TransportDocLevelMonitorFanOutAction * This method hence fetches only docs from shard which haven't been queried before */ private suspend fun searchShard( + monitor: Monitor, index: String, shard: String, prevSeqNo: Long?, @@ -883,8 +961,16 @@ class TransportDocLevelMonitorFanOutAction val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (!docIds.isNullOrEmpty()) { - boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { + if (!docIds.isNullOrEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + } + } else if (monitor.shouldCreateSingleAlertForFindings == true) { + val docIdsParam = mutableListOf() + if (docIds != null) { + docIdsParam.addAll(docIds) + } + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam)) } val request: SearchRequest = SearchRequest() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 366a75e1c..1e613bd0f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -95,7 +95,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { var lastErrorDelegateRun: Exception? = null for (delegate in delegates) { - var indexToDocIds = mapOf>() + var indexToDocIdsWithFindings: Pair>, List>? = Pair(mapOf(), listOf()) var delegateMonitor: Monitor delegateMonitor = monitorsById[delegate.monitorId] ?: throw AlertingException.wrap( @@ -118,7 +118,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { } try { - indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) + indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) } catch (e: Exception) { logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e) return WorkflowRunResult( @@ -131,9 +131,10 @@ object CompositeWorkflowRunner : WorkflowRunner() { workflowId = workflowMetadata.workflowId, workflowMetadataId = workflowMetadata.id, chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, - matchingDocIdsPerIndex = indexToDocIds, + matchingDocIdsPerIndex = indexToDocIdsWithFindings!!.first, auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true - else workflow.auditDelegateMonitorAlerts!! + else workflow.auditDelegateMonitorAlerts!!, + findingIds = indexToDocIdsWithFindings.second ) try { dataSources = delegateMonitor.dataSources diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index a790b1736..ae5619879 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -6233,4 +6233,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { ) ) } + + fun `test execute workflow when bucket monitor is used in chained finding of ignored doc monitor`() { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field_1").field("test_field_1") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Bucket level monitor will reduce the size of matched doc ids on those that belong + // to a bucket that contains more than 1 document after term grouping + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null, + ) + ) + val bucketCustomAlertsIndex = "custom_alerts_index" + val bucketCustomFindingsIndex = "custom_findings_index" + val bucketCustomFindingsIndexPattern = "custom_findings_index-1" + + val bucketLevelMonitorResponse = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = bucketCustomAlertsIndex, + findingsIndex = bucketCustomFindingsIndex, + findingsIndexPattern = bucketCustomFindingsIndexPattern + ) + ) + )!! + + val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf()) + val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf()) + val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3)) + val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val docCustomAlertsIndex = "custom_alerts_index" + val docCustomFindingsIndex = "custom_findings_index" + val docCustomFindingsIndexPattern = "custom_findings_index-1" + var docLevelMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(docTrigger), + dataSources = DataSources( + alertsIndex = docCustomAlertsIndex, + findingsIndex = docCustomFindingsIndex, + findingsIndexPattern = docCustomFindingsIndexPattern + ), + ignoreFindingsAndAlerts = true + ) + + val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! + // 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor) + var workflow = randomWorkflow( + monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id), + enabled = false, + auditDelegateMonitorAlerts = false + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + // Creates 5 documents + insertSampleTimeSerializedData( + index, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2", + "test_value_2", + "test_value_3" + ) + ) + + val workflowId = workflowResponse.id + // 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4) + // 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth) + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + assertNotNull(executeWorkflowResponse) + + for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) { + if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) { + val searchResult = monitorRunResults.inputResults.results.first() + + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg") + ?.get("buckets") as List> + assertEquals("Incorrect search result", 3, buckets.size) + + val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId) + assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2) + assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4")) + } else { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + + val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1) + assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4")) + } + } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index ee4a5ece3..2330974f4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -216,12 +216,14 @@ fun randomDocumentLevelMonitor( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false, dataSources: DataSources, + ignoreFindingsAndAlerts: Boolean? = false, owner: String? = null ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, + shouldCreateSingleAlertForFindings = ignoreFindingsAndAlerts, owner = owner ) } diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 9875ef55f..085a8db80 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -95,6 +95,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( @@ -156,6 +157,7 @@ public void onFailure(Exception e) { Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( @@ -240,6 +242,7 @@ public void onFailure(Exception e) { Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(