diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 204acdd3d..f2af4140e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -36,6 +36,7 @@ import org.opensearch.core.rest.RestStatus import org.opensearch.index.IndexNotFoundException import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.node.NodeClosedException +import org.opensearch.transport.ActionNotFoundTransportException import org.opensearch.transport.ConnectTransportException import org.opensearch.transport.RemoteTransportException import org.opensearch.transport.TransportException @@ -281,7 +282,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { e is RemoteTransportException && ( cause is NodeClosedException || - cause is CircuitBreakingException + cause is CircuitBreakingException || + cause is ActionNotFoundTransportException ) ) ) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 48f55dadc..343bac978 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -152,8 +152,6 @@ class TransportDocLevelMonitorFanOutAction var totalDocsSizeInBytesStat = 0L var docsSizeOfBatchInBytes = 0L var findingsToTriggeredQueries: Map> = mutableMapOf() - // Maps a finding ID to the related document. - private val findingIdToDocSource = mutableMapOf() @Volatile var percQueryMaxNumDocsInMemory: Int = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(settings) @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(settings) @@ -164,10 +162,6 @@ class TransportDocLevelMonitorFanOutAction @Volatile var allowList: List = DestinationSettings.ALLOW_LIST.get(settings) @Volatile var fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(settings) - /* Contains list of docs source that are held in memory to submit to percolate query against query index. - * Docs are fetched from the source index per shard and transformed.*/ - val transformedDocs = mutableListOf>() - init { clusterService.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) { percQueryMaxNumDocsInMemory = it @@ -227,10 +221,10 @@ class TransportDocLevelMonitorFanOutAction val queryToDocIds = mutableMapOf>() val inputRunResults = mutableMapOf>() val docsToQueries = mutableMapOf>() + val transformedDocs = mutableListOf>() + val findingIdToDocSource = mutableMapOf() val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID - val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() - else monitorMetadata.lastRunContext.toMutableMap() as MutableMap> val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput val queries: List = docLevelMonitorInput.queries val fieldsToBeQueried = mutableSetOf() @@ -265,7 +259,8 @@ class TransportDocLevelMonitorFanOutAction updatedIndexNames, concreteIndicesSeenSoFar, ArrayList(fieldsToBeQueried), - shardIds.map { it.id } + shardIds.map { it.id }, + transformedDocs ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo } @@ -277,6 +272,7 @@ class TransportDocLevelMonitorFanOutAction concreteIndicesSeenSoFar, inputRunResults, docsToQueries, + transformedDocs ) } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) @@ -311,6 +307,7 @@ class TransportDocLevelMonitorFanOutAction queryToDocIds, dryrun, executionId = executionId, + findingIdToDocSource, workflowRunContext = workflowRunContext ) } @@ -357,6 +354,7 @@ class TransportDocLevelMonitorFanOutAction queryToDocIds: Map>, dryrun: Boolean, executionId: String, + findingIdToDocSource: MutableMap, workflowRunContext: WorkflowRunContext? ): DocumentLevelTriggerRunResult { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) @@ -389,7 +387,8 @@ class TransportDocLevelMonitorFanOutAction if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty()) getDocSources( findingToDocPairs = findingToDocPairs, - monitor = monitor + monitor = monitor, + findingIdToDocSource = findingIdToDocSource ) val alerts = mutableListOf() @@ -674,6 +673,7 @@ class TransportDocLevelMonitorFanOutAction concreteIndices: List, fieldsToBeQueried: List, shardList: List, + transformedDocs: MutableList>, updateLastRunContext: (String, String) -> Unit ) { for (shardId in shardList) { @@ -723,6 +723,7 @@ class TransportDocLevelMonitorFanOutAction concreteIndices, inputRunResults, docsToQueries, + transformedDocs ) } docTransformTimeTakenStat += System.currentTimeMillis() - startTime @@ -749,6 +750,7 @@ class TransportDocLevelMonitorFanOutAction concreteIndices, inputRunResults, docsToQueries, + transformedDocs ) } } @@ -761,6 +763,7 @@ class TransportDocLevelMonitorFanOutAction concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, + transformedDocs: MutableList> ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -1042,7 +1045,8 @@ class TransportDocLevelMonitorFanOutAction */ private suspend fun getDocSources( findingToDocPairs: List>, - monitor: Monitor + monitor: Monitor, + findingIdToDocSource: MutableMap ) { val docFieldTags = parseSampleDocTags(monitor.triggers) val request = MultiGetRequest()