diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index ec421123e..b10755f33 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -289,7 +289,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { execute(DocLevelMonitorFanOutAction.INSTANCE, docLevelMonitorFanOutRequest1, it) } val lastRunContextFromResponse = dlmfor.lastRunContexts as MutableMap> - lastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap + updatedLastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap logger.error(dlmfor) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 8b9671d1f..6f0c5f1ae 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -129,159 +129,164 @@ class TransportDocLevelMonitorFanOutAction monitorCtx: MonitorRunnerExecutionContext, listener: ActionListener, ) { - val monitor = request.monitor - var monitorResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) - // todo periodStart periodEnd - var nonPercolateSearchesTimeTaken = AtomicLong(0) - var percolateQueriesTimeTaken = AtomicLong(0) - var totalDocsQueried = AtomicLong(0) - var docTransformTimeTaken = AtomicLong(0) - val updatedIndexNames = request.indexExecutionContexts[0].updatedIndexNames - val concreteIndexNames = request.indexExecutionContexts[0].concreteIndexNames - val monitorMetadata = request.monitorMetadata - val queryToDocIds = mutableMapOf>() - val inputRunResults = mutableMapOf>() - val docsToQueries = mutableMapOf>() - val transformedDocs = mutableListOf>() - val docsSizeInBytes = AtomicLong(0) - val shardIds = request.shardIds - val indexShardsMap: MutableMap> = mutableMapOf() - val queryingStartTimeMillis = System.currentTimeMillis() - for (shardId in shardIds) { - if (indexShardsMap.containsKey(shardId.indexName)) { - indexShardsMap[shardId.indexName]!!.add(shardId.id) - } else { - indexShardsMap[shardId.indexName] = mutableListOf(shardId.id) + try { + val monitor = request.monitor + var monitorResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) + // todo periodStart periodEnd + var nonPercolateSearchesTimeTaken = AtomicLong(0) + var percolateQueriesTimeTaken = AtomicLong(0) + var totalDocsQueried = AtomicLong(0) + var docTransformTimeTaken = AtomicLong(0) + val updatedIndexNames = request.indexExecutionContexts[0].updatedIndexNames + val concreteIndexNames = request.indexExecutionContexts[0].concreteIndexNames + val monitorMetadata = request.monitorMetadata + val queryToDocIds = mutableMapOf>() + val inputRunResults = mutableMapOf>() + val docsToQueries = mutableMapOf>() + val transformedDocs = mutableListOf>() + val docsSizeInBytes = AtomicLong(0) + val shardIds = request.shardIds + val indexShardsMap: MutableMap> = mutableMapOf() + val queryingStartTimeMillis = System.currentTimeMillis() + for (shardId in shardIds) { + if (indexShardsMap.containsKey(shardId.indexName)) { + indexShardsMap[shardId.indexName]!!.add(shardId.id) + } else { + indexShardsMap[shardId.indexName] = mutableListOf(shardId.id) + } } - } - val lastRunContext = mutableMapOf>() - InputRunResults - val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput - val queries: List = docLevelMonitorInput.queries - val fieldsToBeQueried = mutableSetOf() - for (it in queries) { - if (it.queryFieldNames.isEmpty()) { - fieldsToBeQueried.clear() - logger.debug( - "Monitor ${request.monitor.id} : " + - "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + - "Cannot optimize monitor to fetch only query-relevant fields. " + - "Querying entire doc source." - ) - break + val lastRunContext = mutableMapOf>() + InputRunResults + val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput + val queries: List = docLevelMonitorInput.queries + val fieldsToBeQueried = mutableSetOf() + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + log.debug( + "Monitor ${request.monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) } - fieldsToBeQueried.addAll(it.queryFieldNames) - } - for (entry in indexShardsMap) { - val indexExecutionContext = - request.indexExecutionContexts.stream() - .filter { it.concreteIndexName.equals(entry.key) }.findAny() - .get() - fetchShardDataAndMaybeExecutePercolateQueries( - request.monitor, - monitorCtx, - indexExecutionContext, - request.monitorMetadata, - inputRunResults, - docsToQueries, - transformedDocs, - docsSizeInBytes, - indexExecutionContext.updatedIndexNames, - indexExecutionContext.concreteIndexNames, - ArrayList(fieldsToBeQueried), - nonPercolateSearchesTimeTaken, - percolateQueriesTimeTaken, - totalDocsQueried, - docTransformTimeTaken - ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number - indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + for (entry in indexShardsMap) { + val indexExecutionContext = + request.indexExecutionContexts.stream() + .filter { it.concreteIndexName.equals(entry.key) }.findAny() + .get() + fetchShardDataAndMaybeExecutePercolateQueries( + request.monitor, + monitorCtx, + indexExecutionContext, + request.monitorMetadata, + inputRunResults, + docsToQueries, + transformedDocs, + docsSizeInBytes, + indexExecutionContext.updatedIndexNames, + indexExecutionContext.concreteIndexNames, + ArrayList(fieldsToBeQueried), + nonPercolateSearchesTimeTaken, + percolateQueriesTimeTaken, + totalDocsQueried, + docTransformTimeTaken + ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number + indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + } + lastRunContext[indexExecutionContext.concreteIndexName] = indexExecutionContext.updatedLastRunContext } - lastRunContext[indexExecutionContext.concreteIndexName] = indexExecutionContext.updatedLastRunContext - } - /* if all indices are covered still in-memory docs size limit is not breached we would need to submit - the percolate query at the end */ - if (transformedDocs.isNotEmpty()) { - performPercolateQueryAndResetCounters( - monitorCtx, - transformedDocs, - docsSizeInBytes, - monitor, - monitorMetadata, - updatedIndexNames, - concreteIndexNames, - inputRunResults, - docsToQueries, - percolateQueriesTimeTaken, - totalDocsQueried - ) - } - val took = System.currentTimeMillis() - queryingStartTimeMillis - logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in ${request.executionId}") - monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end */ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + updatedIndexNames, + concreteIndexNames, + inputRunResults, + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried + ) + } + val took = System.currentTimeMillis() - queryingStartTimeMillis + log.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in ${request.executionId}") + monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) - /* - populate the map queryToDocIds with pairs of - this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser - */ - queries.forEach { - if (inputRunResults.containsKey(it.id)) { - queryToDocIds[it] = inputRunResults[it.id]!! + /* + populate the map queryToDocIds with pairs of + this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser + */ + queries.forEach { + if (inputRunResults.containsKey(it.id)) { + queryToDocIds[it] = inputRunResults[it.id]!! + } } - } - val idQueryMap: Map = queries.associateBy { it.id } + val idQueryMap: Map = queries.associateBy { it.id } - val triggerResults = mutableMapOf() - // If there are no triggers defined, we still want to generate findings - if (monitor.triggers.isEmpty()) { - if (monitor.id != Monitor.NO_ID) { - logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") - createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) + val triggerResults = mutableMapOf() + // If there are no triggers defined, we still want to generate findings + if (monitor.triggers.isEmpty()) { + if (monitor.id != Monitor.NO_ID) { + log.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) + } + } else { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorCtx, + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + false, + executionId = request.executionId, + workflowRunContext = request.workflowRunContext + ) + } } - } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorCtx, - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - false, + + // If any error happened during trigger execution, upsert monitor error alert + val errorMessage = + constructErrorMessageFromTriggerResults(triggerResults = triggerResults) + if (errorMessage.isNotEmpty()) { + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, executionId = request.executionId, - workflowRunContext = request.workflowRunContext + request.workflowRunContext ) + } else { + onSuccessfulMonitorRun(monitorCtx, monitor) } - } - - // If any error happened during trigger execution, upsert monitor error alert - val errorMessage = - constructErrorMessageFromTriggerResults(triggerResults = triggerResults) - if (errorMessage.isNotEmpty()) { - monitorCtx.alertService!!.upsertMonitorErrorAlert( - monitor = monitor, - errorMessage = errorMessage, - executionId = request.executionId, - request.workflowRunContext + listener.onResponse( + DocLevelMonitorFanOutResponse( + nodeId = monitorCtx.clusterService!!.localNode().id, + executionId = request.executionId, + monitorId = monitor.id, + shardIdFailureMap = emptyMap(), + findingIds = emptyList(), + lastRunContext as MutableMap, + InputRunResults(listOf(inputRunResults)), + triggerResults + ) ) - } else { - onSuccessfulMonitorRun(monitorCtx, monitor) + } catch (e: Exception) { + log.error("${request.monitor.id} Failed to run fan_out on node ${monitorCtx.clusterService.localNode().id} due to error",e) + listener.onFailure(e) } - listener.onResponse( - DocLevelMonitorFanOutResponse( - nodeId = monitorCtx.clusterService!!.localNode().id, - executionId = request.executionId, - monitorId = monitor.id, - shardIdFailureMap = emptyMap(), - findingIds = emptyList(), - lastRunContext as MutableMap, - InputRunResults(listOf(inputRunResults)), - triggerResults - ) - ) } private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) { @@ -349,7 +354,7 @@ class TransportDocLevelMonitorFanOutAction val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) .string() - logger.debug("Findings: $findingStr") + log.debug("Findings: $findingStr") if (shouldCreateFinding) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) @@ -367,11 +372,11 @@ class TransportDocLevelMonitorFanOutAction if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> if (item.isFailed) { - logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + log.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") } } } else { - logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + log.debug("[${bulkResponse.items.size}] All findings successfully indexed.") } } return findingDocPairs @@ -465,7 +470,7 @@ class TransportDocLevelMonitorFanOutAction docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime) } } catch (e: Exception) { - logger.error( + log.error( "Monitor ${monitor.id} :" + "Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " + "Error: ${e.message}", @@ -562,7 +567,7 @@ class TransportDocLevelMonitorFanOutAction val message = "Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" + " sourceIndices: $monitorInputIndices" - logger.error(message) + log.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) @@ -573,7 +578,7 @@ class TransportDocLevelMonitorFanOutAction val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - logger.debug( + log.debug( "Monitor ${monitor.id}: " + "Executing percolate query for docs from source indices " + "$monitorInputIndices against query index $queryIndices" @@ -598,8 +603,8 @@ class TransportDocLevelMonitorFanOutAction "Response status is ${response.status()}" ) } - logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") - logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response") + log.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + log.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response") percolateQueriesTimeTaken.getAndAdd(response.took.millis) return response.hits } @@ -643,7 +648,7 @@ class TransportDocLevelMonitorFanOutAction TransformedDocDto(index, concreteIndex, hit.id, sourceRef) ) } catch (e: Exception) { - logger.error( + log.error( "Monitor $monitorId: Failed to transform payload $hit for percolate query", e ) @@ -1052,3 +1057,4 @@ class TransportDocLevelMonitorFanOutAction return NotificationActionConfigs(destination, channel) } } +}