From bc90d8f05330cb848cda6c7dbde7779b7455a1ac Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 13 Nov 2024 23:07:35 +0000 Subject: [PATCH 1/6] optimize execution of workflow consisting of bucket-level followed by doc-level monitors Signed-off-by: Subhobrata Dey --- .../TransportDocLevelMonitorFanOutAction.kt | 114 +++++++++++++---- .../alerting/MonitorDataSourcesIT.kt | 116 ++++++++++++++++++ .../org/opensearch/alerting/TestHelpers.kt | 4 +- 3 files changed, 211 insertions(+), 23 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 60fe2b684..6d1ac90cc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -297,19 +297,33 @@ class TransportDocLevelMonitorFanOutAction createFindings(monitor, docsToQueries, idQueryMap, true) } } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - dryrun, - executionId = executionId, - findingIdToDocSource, - workflowRunContext = workflowRunContext - ) + if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + dryrun, + executionId = executionId, + findingIdToDocSource, + workflowRunContext = workflowRunContext + ) + } + } else if (monitor.ignoreFindingsAndAlerts == true) { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts( + monitorResult, + it as DocumentLevelTrigger, + monitor, + queryToDocIds, + dryrun, + executionId, + workflowRunContext + ) + } } } @@ -349,6 +363,50 @@ class TransportDocLevelMonitorFanOutAction } } + private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts( + monitorResult: MonitorRunResult, + trigger: DocumentLevelTrigger, + monitor: Monitor, + queryToDocIds: Map>, + dryrun: Boolean, + executionId: String, + workflowRunContext: WorkflowRunContext? + ): DocumentLevelTriggerRunResult { + val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds) + if (triggerResult.triggeredDocs.isNotEmpty()) { + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) + val alert = alertService.composeDocLevelAlert( + listOf(), + triggerResult.triggeredDocs, + triggerCtx, + monitorResult.alertError() ?: triggerResult.alertError(), + executionId = executionId, + workflorwRunContext = workflowRunContext + ) + for (action in trigger.actions) { + this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun) + } + + if (!dryrun && monitor.id != Monitor.NO_ID) { + val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap()) + val actionExecutionResults = actionResults.values.map { actionRunResult -> + ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0) + } + val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults) + + retryPolicy.let { + alertService.saveAlerts( + monitor.dataSources, + listOf(updatedAlert), + it, + routingId = monitor.id + ) + } + } + } + return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error) + } + private suspend fun runForEachDocTrigger( monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, @@ -512,7 +570,7 @@ class TransportDocLevelMonitorFanOutAction .string() log.debug("Findings: $findingStr") - if (shouldCreateFinding) { + if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -524,13 +582,15 @@ class TransportDocLevelMonitorFanOutAction bulkIndexFindings(monitor, indexRequests) } - try { - findings.forEach { finding -> - publishFinding(monitor, finding) + if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + try { + findings.forEach { finding -> + publishFinding(monitor, finding) + } + } catch (e: Exception) { + // suppress exception + log.error("Optional finding callback failed", e) } - } catch (e: Exception) { - // suppress exception - log.error("Optional finding callback failed", e) } this.findingsToTriggeredQueries += findingsToTriggeredQueries @@ -688,6 +748,7 @@ class TransportDocLevelMonitorFanOutAction var to: Long = Long.MAX_VALUE while (to >= from) { val hits: SearchHits = searchShard( + monitor, indexExecutionCtx.concreteIndexName, shard, from, @@ -870,6 +931,7 @@ class TransportDocLevelMonitorFanOutAction * This method hence fetches only docs from shard which haven't been queried before */ private suspend fun searchShard( + monitor: Monitor, index: String, shard: String, prevSeqNo: Long?, @@ -883,8 +945,16 @@ class TransportDocLevelMonitorFanOutAction val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (!docIds.isNullOrEmpty()) { - boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + if (!docIds.isNullOrEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + } + } else if (monitor.ignoreFindingsAndAlerts == true) { + val docIdsParam = mutableListOf() + if (docIds != null) { + docIdsParam.addAll(docIds) + } + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam)) } val request: SearchRequest = SearchRequest() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index a790b1736..49c648e3c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -6233,4 +6233,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { ) ) } + + fun `test execute workflow with custom alerts and finding index when bucket monitor is used in chained finding of ignored doc monitor`() { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field_1").field("test_field_1") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Bucket level monitor will reduce the size of matched doc ids on those that belong + // to a bucket that contains more than 1 document after term grouping + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null, + ) + ) + val bucketCustomAlertsIndex = "custom_alerts_index" + val bucketCustomFindingsIndex = "custom_findings_index" + val bucketCustomFindingsIndexPattern = "custom_findings_index-1" + + val bucketLevelMonitorResponse = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = bucketCustomAlertsIndex, + findingsIndex = bucketCustomFindingsIndex, + findingsIndexPattern = bucketCustomFindingsIndexPattern + ) + ) + )!! + + val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf()) + val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf()) + val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3)) + val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val docCustomAlertsIndex = "custom_alerts_index" + val docCustomFindingsIndex = "custom_findings_index" + val docCustomFindingsIndexPattern = "custom_findings_index-1" + var docLevelMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(docTrigger), + dataSources = DataSources( + alertsIndex = docCustomAlertsIndex, + findingsIndex = docCustomFindingsIndex, + findingsIndexPattern = docCustomFindingsIndexPattern + ), + ignoreFindingsAndAlerts = true + ) + + val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! + // 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor) + var workflow = randomWorkflow( + monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id), + enabled = false, + auditDelegateMonitorAlerts = false + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + // Creates 5 documents + insertSampleTimeSerializedData( + index, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2", + "test_value_2", + "test_value_3" + ) + ) + + val workflowId = workflowResponse.id + // 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4) + // 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth) + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + assertNotNull(executeWorkflowResponse) + + for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) { + if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) { + val searchResult = monitorRunResults.inputResults.results.first() + + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg") + ?.get("buckets") as List> + assertEquals("Incorrect search result", 3, buckets.size) + + val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId) + assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2) + assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4")) + } else { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + + val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1) + assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4")) + } + } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index ee4a5ece3..43272107b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -216,12 +216,14 @@ fun randomDocumentLevelMonitor( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false, dataSources: DataSources, + ignoreFindingsAndAlerts: Boolean? = false, owner: String? = null ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, + ignoreFindingsAndAlerts = ignoreFindingsAndAlerts, owner = owner ) } From 14f937197736148e1ff02a5c59db2b0463814da5 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 19 Nov 2024 03:56:14 +0000 Subject: [PATCH 2/6] sort matching docs by seq_no Signed-off-by: Subhobrata Dey --- .../kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 99bedd46f..cc9e6508c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder import org.opensearch.transport.TransportService import java.time.Instant import java.util.UUID @@ -479,7 +480,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { val queryBuilder = if (input.query.query() == null) BoolQueryBuilder() else QueryBuilders.boolQuery().must(source.query()) queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues)) - sr.source().query(queryBuilder) + sr.source().query(queryBuilder).sort("_seq_no", SortOrder.DESC) } sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes( getCancelAfterTimeInterval() From 3495cf83ba38ccd8eb5cf49a189474b5bc76846c Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Nov 2024 23:09:14 +0000 Subject: [PATCH 3/6] update code-review comments Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 4 ++- .../org/opensearch/alerting/InputService.kt | 2 +- .../opensearch/alerting/WorkflowService.kt | 9 +++--- .../TransportDocLevelMonitorFanOutAction.kt | 30 +++++++++++++------ .../workflow/CompositeWorkflowRunner.kt | 6 ++-- .../SampleRemoteMonitorRestHandler.java | 3 ++ 6 files changed, 36 insertions(+), 18 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b5b2c4f49..267a1be6d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -138,7 +138,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } // Map of document ids per index when monitor is workflow delegate and has chained findings - val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first + val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() @@ -226,6 +227,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), + findingIdsForMatchingDocIds ) val shards = mutableSetOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index c7eeac833..3c8194ceb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -90,7 +90,7 @@ class InputService( periodStart = periodStart, periodEnd = periodEnd, prevResult = prevResult, - matchingDocIdsPerIndex = matchingDocIdsPerIndex, + matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first, returnSampleDocs = false ) val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt index 32d7971f1..1379a1fe3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt @@ -41,15 +41,16 @@ class WorkflowService( * @param chainedMonitors Monitors that have previously executed * @param workflowExecutionId Execution id of the current workflow */ - suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List, workflowExecutionId: String): Map> { + suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List, workflowExecutionId: String): + Pair>, List> { if (chainedMonitors.isEmpty()) - return emptyMap() + return Pair(emptyMap(), listOf()) val dataSources = chainedMonitors[0].dataSources try { val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil { exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it) } - if (existsResponse.isExists == false) return emptyMap() + if (existsResponse.isExists == false) return Pair(emptyMap(), listOf()) // Search findings index to match id of monitors and workflow execution id val bqb = QueryBuilders.boolQuery() .filter( @@ -83,7 +84,7 @@ class WorkflowService( for (finding in findings) { indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds) } - return indexToRelatedDocIdsMap + return Pair(indexToRelatedDocIdsMap, findings.map { it.id }) } catch (t: Exception) { log.error("Error getting finding doc ids: ${t.message}", t) throw AlertingException.wrap(t) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 6d1ac90cc..590732da4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -297,7 +297,11 @@ class TransportDocLevelMonitorFanOutAction createFindings(monitor, docsToQueries, idQueryMap, true) } } else { - if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + /** + * if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger + * generates a single alert with multiple findings. + */ + if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { monitor.triggers.forEach { triggerResults[it.id] = runForEachDocTrigger( monitorResult, @@ -312,9 +316,9 @@ class TransportDocLevelMonitorFanOutAction workflowRunContext = workflowRunContext ) } - } else if (monitor.ignoreFindingsAndAlerts == true) { + } else if (monitor.shouldPersistFindingsAndAlerts == true) { monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts( + triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts( monitorResult, it as DocumentLevelTrigger, monitor, @@ -363,7 +367,10 @@ class TransportDocLevelMonitorFanOutAction } } - private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts( + /** + * run doc-level triggers ignoring findings and alerts and generating a single alert. + */ + private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts( monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, monitor: Monitor, @@ -374,9 +381,14 @@ class TransportDocLevelMonitorFanOutAction ): DocumentLevelTriggerRunResult { val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds) if (triggerResult.triggeredDocs.isNotEmpty()) { + val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) { + workflowRunContext.matchingDocIdsPerIndex.second + } else { + listOf() + } val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val alert = alertService.composeDocLevelAlert( - listOf(), + findingIds, triggerResult.triggeredDocs, triggerCtx, monitorResult.alertError() ?: triggerResult.alertError(), @@ -570,7 +582,7 @@ class TransportDocLevelMonitorFanOutAction .string() log.debug("Findings: $findingStr") - if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) { + if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -582,7 +594,7 @@ class TransportDocLevelMonitorFanOutAction bulkIndexFindings(monitor, indexRequests) } - if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { try { findings.forEach { finding -> publishFinding(monitor, finding) @@ -945,11 +957,11 @@ class TransportDocLevelMonitorFanOutAction val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } - } else if (monitor.ignoreFindingsAndAlerts == true) { + } else if (monitor.shouldPersistFindingsAndAlerts == true) { val docIdsParam = mutableListOf() if (docIds != null) { docIdsParam.addAll(docIds) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 366a75e1c..2ca8f978a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -95,7 +95,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { var lastErrorDelegateRun: Exception? = null for (delegate in delegates) { - var indexToDocIds = mapOf>() + var indexToDocIdsWithFindings: Pair>, List>? = Pair(mapOf(), listOf()) var delegateMonitor: Monitor delegateMonitor = monitorsById[delegate.monitorId] ?: throw AlertingException.wrap( @@ -118,7 +118,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { } try { - indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) + indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) } catch (e: Exception) { logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e) return WorkflowRunResult( @@ -131,7 +131,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { workflowId = workflowMetadata.workflowId, workflowMetadataId = workflowMetadata.id, chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, - matchingDocIdsPerIndex = indexToDocIds, + matchingDocIdsPerIndex = indexToDocIdsWithFindings!!, auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true else workflow.auditDelegateMonitorAlerts!! ) diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 9875ef55f..085a8db80 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -95,6 +95,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( @@ -156,6 +157,7 @@ public void onFailure(Exception e) { Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( @@ -240,6 +242,7 @@ public void onFailure(Exception e) { Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest( From f6a6375e819ba5d4a15a6906942ce6b5aa25af7c Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 11 Dec 2024 03:18:42 +0000 Subject: [PATCH 4/6] address comments Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 8 ++++-- .../org/opensearch/alerting/InputService.kt | 2 +- .../TransportDocLevelMonitorFanOutAction.kt | 26 +++++++++++-------- .../workflow/CompositeWorkflowRunner.kt | 5 ++-- .../org/opensearch/alerting/TestHelpers.kt | 2 +- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 267a1be6d..16272fd99 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -138,8 +138,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } // Map of document ids per index when monitor is workflow delegate and has chained findings - val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first - val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + val findingIdsForMatchingDocIds = if (workflowRunContext?.findingIds != null) { + workflowRunContext.findingIds + } else { + listOf() + } val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 3c8194ceb..c7eeac833 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -90,7 +90,7 @@ class InputService( periodStart = periodStart, periodEnd = periodEnd, prevResult = prevResult, - matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first, + matchingDocIdsPerIndex = matchingDocIdsPerIndex, returnSampleDocs = false ) val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 590732da4..7e6f2ed1d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -301,7 +301,7 @@ class TransportDocLevelMonitorFanOutAction * if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger * generates a single alert with multiple findings. */ - if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { monitor.triggers.forEach { triggerResults[it.id] = runForEachDocTrigger( monitorResult, @@ -316,9 +316,9 @@ class TransportDocLevelMonitorFanOutAction workflowRunContext = workflowRunContext ) } - } else if (monitor.shouldPersistFindingsAndAlerts == true) { + } else if (monitor.shouldCreateSingleAlertForFindings == true) { monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts( + triggerResults[it.id] = runForEachDocTriggerCreateSingleGroupedAlert( monitorResult, it as DocumentLevelTrigger, monitor, @@ -370,7 +370,7 @@ class TransportDocLevelMonitorFanOutAction /** * run doc-level triggers ignoring findings and alerts and generating a single alert. */ - private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts( + private suspend fun runForEachDocTriggerCreateSingleGroupedAlert( monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, monitor: Monitor, @@ -381,14 +381,14 @@ class TransportDocLevelMonitorFanOutAction ): DocumentLevelTriggerRunResult { val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds) if (triggerResult.triggeredDocs.isNotEmpty()) { - val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) { - workflowRunContext.matchingDocIdsPerIndex.second + val findingIds = if (workflowRunContext?.findingIds != null) { + workflowRunContext.findingIds } else { listOf() } val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val alert = alertService.composeDocLevelAlert( - findingIds, + findingIds!!, triggerResult.triggeredDocs, triggerCtx, monitorResult.alertError() ?: triggerResult.alertError(), @@ -582,7 +582,11 @@ class TransportDocLevelMonitorFanOutAction .string() log.debug("Findings: $findingStr") - if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) { + if (shouldCreateFinding and ( + monitor.shouldCreateSingleAlertForFindings == null || + monitor.shouldCreateSingleAlertForFindings == false + ) + ) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -594,7 +598,7 @@ class TransportDocLevelMonitorFanOutAction bulkIndexFindings(monitor, indexRequests) } - if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { try { findings.forEach { finding -> publishFinding(monitor, finding) @@ -957,11 +961,11 @@ class TransportDocLevelMonitorFanOutAction val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } - } else if (monitor.shouldPersistFindingsAndAlerts == true) { + } else if (monitor.shouldCreateSingleAlertForFindings == true) { val docIdsParam = mutableListOf() if (docIds != null) { docIdsParam.addAll(docIds) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 2ca8f978a..1e613bd0f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -131,9 +131,10 @@ object CompositeWorkflowRunner : WorkflowRunner() { workflowId = workflowMetadata.workflowId, workflowMetadataId = workflowMetadata.id, chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, - matchingDocIdsPerIndex = indexToDocIdsWithFindings!!, + matchingDocIdsPerIndex = indexToDocIdsWithFindings!!.first, auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true - else workflow.auditDelegateMonitorAlerts!! + else workflow.auditDelegateMonitorAlerts!!, + findingIds = indexToDocIdsWithFindings.second ) try { dataSources = delegateMonitor.dataSources diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 43272107b..2330974f4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -223,7 +223,7 @@ fun randomDocumentLevelMonitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, - ignoreFindingsAndAlerts = ignoreFindingsAndAlerts, owner = owner + shouldCreateSingleAlertForFindings = ignoreFindingsAndAlerts, owner = owner ) } From 036f01ca535c82e3a6d7bb1a9db996e51d987750 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 5 Dec 2024 12:09:24 -0500 Subject: [PATCH 5/6] Upgrade to upload-artifact v4 (#1739) * Upgrade to upload-artifact v4 Signed-off-by: Craig Perkins * Upgrade actions/checkout to v4 Signed-off-by: Craig Perkins * Add run-start-commands Signed-off-by: Craig Perkins * Set overwrite to true Signed-off-by: Craig Perkins * Only run with jdk 21 Signed-off-by: Craig Perkins --------- Signed-off-by: Craig Perkins --- .github/workflows/auto-release.yml | 2 +- .github/workflows/bwc-test-workflow.yml | 6 ++++-- .github/workflows/maven-publish.yml | 2 +- .github/workflows/multi-node-test-workflow.yml | 8 ++++---- .github/workflows/security-test-workflow.yml | 4 ++-- .github/workflows/test-workflow.yml | 18 ++++++++++-------- 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/.github/workflows/auto-release.yml b/.github/workflows/auto-release.yml index 24eeb2730..93f22c2a3 100644 --- a/.github/workflows/auto-release.yml +++ b/.github/workflows/auto-release.yml @@ -22,7 +22,7 @@ jobs: - name: Get tag id: tag uses: dawidd6/action-get-tag@v1 - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: ncipollo/release-action@v1 with: github_token: ${{ steps.github_app_token.outputs.token }} diff --git a/.github/workflows/bwc-test-workflow.yml b/.github/workflows/bwc-test-workflow.yml index bf8e8ff7c..e6230bce3 100644 --- a/.github/workflows/bwc-test-workflow.yml +++ b/.github/workflows/bwc-test-workflow.yml @@ -29,12 +29,14 @@ jobs: # this image tag is subject to change as more dependencies and updates will arrive over time image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }} # need to switch to root so that github actions can install runner binary on container without permission issues. - options: --user root + options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }} steps: + - name: Run start commands + run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }} # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch - uses: actions/checkout@v2 + uses: actions/checkout@v4 # This step uses the setup-java Github action: https://github.com/actions/setup-java - name: Set Up JDK ${{ matrix.java }} uses: actions/setup-java@v1 diff --git a/.github/workflows/maven-publish.yml b/.github/workflows/maven-publish.yml index e822e15cb..55fce8159 100644 --- a/.github/workflows/maven-publish.yml +++ b/.github/workflows/maven-publish.yml @@ -27,7 +27,7 @@ jobs: with: distribution: temurin # Temurin is a distribution of adoptium java-version: ${{ matrix.jdk }} - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: aws-actions/configure-aws-credentials@v1 with: role-to-assume: ${{ secrets.PUBLISH_SNAPSHOTS_ROLE }} diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index fa74c70d8..85b9fb168 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -7,8 +7,6 @@ on: push: branches: - "*" -env: - ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true jobs: Get-CI-Image-Tag: @@ -30,9 +28,11 @@ jobs: # this image tag is subject to change as more dependencies and updates will arrive over time image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }} # need to switch to root so that github actions can install runner binary on container without permission issues. - options: --user root + ooptions: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }} steps: + - name: Run start commands + run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }} # This step uses the setup-java Github action: https://github.com/actions/setup-java - name: Set Up JDK ${{ matrix.java }} uses: actions/setup-java@v1 @@ -40,7 +40,7 @@ jobs: java-version: ${{ matrix.java }} # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Run integration tests with multi node config run: | chown -R 1000:1000 `pwd` diff --git a/.github/workflows/security-test-workflow.yml b/.github/workflows/security-test-workflow.yml index 31f7ba9ff..bfa1e0fa3 100644 --- a/.github/workflows/security-test-workflow.yml +++ b/.github/workflows/security-test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [ 21, 23 ] + java: [ 21 ] # Job name name: Build and test Alerting # This job runs on Linux @@ -25,7 +25,7 @@ jobs: java-version: ${{ matrix.java }} # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch - uses: actions/checkout@v2 + uses: actions/checkout@v4 # This step uses the setup-java Github action: https://github.com/actions/setup-java - name: Set Up JDK ${{ matrix.java }} uses: actions/setup-java@v1 diff --git a/.github/workflows/test-workflow.yml b/.github/workflows/test-workflow.yml index e539d04f9..171ca88c3 100644 --- a/.github/workflows/test-workflow.yml +++ b/.github/workflows/test-workflow.yml @@ -7,8 +7,6 @@ on: push: branches: - "*" -env: - ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true jobs: Get-CI-Image-Tag: @@ -33,12 +31,14 @@ jobs: # this image tag is subject to change as more dependencies and updates will arrive over time image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }} # need to switch to root so that github actions can install runner binary on container without permission issues. - options: --user root + options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }} steps: + - name: Run start commands + run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }} # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch - uses: actions/checkout@v2 + uses: actions/checkout@v4 # This step uses the setup-java Github action: https://github.com/actions/setup-java - name: Set Up JDK ${{ matrix.java }} uses: actions/setup-java@v1 @@ -54,15 +54,16 @@ jobs: cp ./alerting/build/distributions/*.zip alerting-artifacts # This step uses the codecov-action Github action: https://github.com/codecov/codecov-action - name: Upload Coverage Report - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} # This step uses the upload-artifact Github action: https://github.com/actions/upload-artifact - name: Upload Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: alerting-plugin-${{ matrix.os }} path: alerting-artifacts + overwrite: true build: needs: Get-CI-Image-Tag @@ -85,7 +86,7 @@ jobs: steps: # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch - uses: actions/checkout@v2 + uses: actions/checkout@v4 # This is a hack, but this step creates a link to the X: mounted drive, which makes the path # short enough to work on Windows - name: Shorten Path @@ -107,7 +108,8 @@ jobs: cp ./alerting/build/distributions/*.zip alerting-artifacts # This step uses the upload-artifact Github action: https://github.com/actions/upload-artifact - name: Upload Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: alerting-plugin-${{ matrix.os }} path: alerting-artifacts + overwrite: true From 605105b54049754e016cbf9a02dafb790a016ecb Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 11 Dec 2024 08:50:39 +0000 Subject: [PATCH 6/6] fix ktlint Signed-off-by: Subhobrata Dey --- .../test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 49c648e3c..ae5619879 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -6234,7 +6234,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { ) } - fun `test execute workflow with custom alerts and finding index when bucket monitor is used in chained finding of ignored doc monitor`() { + fun `test execute workflow when bucket monitor is used in chained finding of ignored doc monitor`() { val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10d") .lte("{{period_end}}")