From f6a6375e819ba5d4a15a6906942ce6b5aa25af7c Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 11 Dec 2024 03:18:42 +0000 Subject: [PATCH] address comments Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 8 ++++-- .../org/opensearch/alerting/InputService.kt | 2 +- .../TransportDocLevelMonitorFanOutAction.kt | 26 +++++++++++-------- .../workflow/CompositeWorkflowRunner.kt | 5 ++-- .../org/opensearch/alerting/TestHelpers.kt | 2 +- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 267a1be6d..16272fd99 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -138,8 +138,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } // Map of document ids per index when monitor is workflow delegate and has chained findings - val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first - val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + val findingIdsForMatchingDocIds = if (workflowRunContext?.findingIds != null) { + workflowRunContext.findingIds + } else { + listOf() + } val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 3c8194ceb..c7eeac833 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -90,7 +90,7 @@ class InputService( periodStart = periodStart, periodEnd = periodEnd, prevResult = prevResult, - matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first, + matchingDocIdsPerIndex = matchingDocIdsPerIndex, returnSampleDocs = false ) val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 590732da4..7e6f2ed1d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -301,7 +301,7 @@ class TransportDocLevelMonitorFanOutAction * if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger * generates a single alert with multiple findings. */ - if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { monitor.triggers.forEach { triggerResults[it.id] = runForEachDocTrigger( monitorResult, @@ -316,9 +316,9 @@ class TransportDocLevelMonitorFanOutAction workflowRunContext = workflowRunContext ) } - } else if (monitor.shouldPersistFindingsAndAlerts == true) { + } else if (monitor.shouldCreateSingleAlertForFindings == true) { monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts( + triggerResults[it.id] = runForEachDocTriggerCreateSingleGroupedAlert( monitorResult, it as DocumentLevelTrigger, monitor, @@ -370,7 +370,7 @@ class TransportDocLevelMonitorFanOutAction /** * run doc-level triggers ignoring findings and alerts and generating a single alert. */ - private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts( + private suspend fun runForEachDocTriggerCreateSingleGroupedAlert( monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, monitor: Monitor, @@ -381,14 +381,14 @@ class TransportDocLevelMonitorFanOutAction ): DocumentLevelTriggerRunResult { val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds) if (triggerResult.triggeredDocs.isNotEmpty()) { - val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) { - workflowRunContext.matchingDocIdsPerIndex.second + val findingIds = if (workflowRunContext?.findingIds != null) { + workflowRunContext.findingIds } else { listOf() } val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val alert = alertService.composeDocLevelAlert( - findingIds, + findingIds!!, triggerResult.triggeredDocs, triggerCtx, monitorResult.alertError() ?: triggerResult.alertError(), @@ -582,7 +582,11 @@ class TransportDocLevelMonitorFanOutAction .string() log.debug("Findings: $findingStr") - if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) { + if (shouldCreateFinding and ( + monitor.shouldCreateSingleAlertForFindings == null || + monitor.shouldCreateSingleAlertForFindings == false + ) + ) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -594,7 +598,7 @@ class TransportDocLevelMonitorFanOutAction bulkIndexFindings(monitor, indexRequests) } - if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { try { findings.forEach { finding -> publishFinding(monitor, finding) @@ -957,11 +961,11 @@ class TransportDocLevelMonitorFanOutAction val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { + if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) { if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } - } else if (monitor.shouldPersistFindingsAndAlerts == true) { + } else if (monitor.shouldCreateSingleAlertForFindings == true) { val docIdsParam = mutableListOf() if (docIds != null) { docIdsParam.addAll(docIds) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 2ca8f978a..1e613bd0f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -131,9 +131,10 @@ object CompositeWorkflowRunner : WorkflowRunner() { workflowId = workflowMetadata.workflowId, workflowMetadataId = workflowMetadata.id, chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, - matchingDocIdsPerIndex = indexToDocIdsWithFindings!!, + matchingDocIdsPerIndex = indexToDocIdsWithFindings!!.first, auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true - else workflow.auditDelegateMonitorAlerts!! + else workflow.auditDelegateMonitorAlerts!!, + findingIds = indexToDocIdsWithFindings.second ) try { dataSources = delegateMonitor.dataSources diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 43272107b..2330974f4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -223,7 +223,7 @@ fun randomDocumentLevelMonitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, - ignoreFindingsAndAlerts = ignoreFindingsAndAlerts, owner = owner + shouldCreateSingleAlertForFindings = ignoreFindingsAndAlerts, owner = owner ) }