diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 3b8e4dee7..272da7385 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -57,6 +57,7 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.percolator.PercolateQueryBuilderExt +import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder @@ -219,39 +220,18 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - val matchingDocs = getMatchingDocs( + fetchDataAndExecutePercolateQueriesPerShard( monitor, monitorCtx, docExecutionContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), - matchingDocIdsPerIndex?.get(concreteIndexName) + matchingDocIdsPerIndex?.get(concreteIndexName), + monitorMetadata, + inputRunResults, + docsToQueries ) - - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( - monitorCtx, - matchingDocs.map { it.second }, - monitor, - monitorMetadata, - updatedIndexName, - concreteIndexName - ) - - matchedQueriesForDocs.forEach { hit -> - val id = hit.id - .replace("_${updatedIndexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) - } - } - } } } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) @@ -598,17 +578,25 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - private suspend fun getMatchingDocs( + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. needs to be converted to scroll if performant enough) + * 2. Transform documents to submit to percolate query + * 3. perform percolate queries + * 4. update docToQueries Map with all hits from percolate queries + * */ + private suspend fun fetchDataAndExecutePercolateQueriesPerShard( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String, - concreteIndex: String, + indexName: String, + concreteIndexName: String, conflictingFields: List, - docIds: List? = null - ): List> { + docIds: List? = null, + monitorMetadata: MonitorMetadata, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int - val matchingDocs = mutableListOf>() + val transformedDocs = mutableListOf>() for (i: Int in 0 until count) { val shard = i.toString() try { @@ -617,24 +605,59 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val hits: SearchHits = searchShard( monitorCtx, - concreteIndex, + concreteIndexName, shard, prevSeqNo, maxSeqNo, null, docIds ) + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexName, + concreteIndexName, + monitor.id, + conflictingFields + ) + ) + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id} :" + + " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + e + ) + } + + if (transformedDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries( + monitorCtx, + transformedDocs.map { it.second }, + monitor, + monitorMetadata, + indexName, + concreteIndexName + ) + + matchedQueriesForDocs.forEach { hit -> + val id = hit.id + .replace("_${indexName}_${monitor.id}", "") + .replace("_${concreteIndexName}_${monitor.id}", "") - if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${transformedDocs[idx].first}|$concreteIndexName" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + } } - } catch (e: Exception) { - logger.warn("Failed to run for shard $shard. Error: ${e.message}") } } - return matchingDocs } + /** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo. + * This method hence fetches only docs from shard which haven't been queried before + */ private suspend fun searchShard( monitorCtx: MonitorRunnerExecutionContext, index: String, @@ -665,12 +688,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { SearchSourceBuilder() .version(true) .query(boolQueryBuilder) - .size(10000) // fixme: make this configurable. + .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) .preference(Preference.PRIMARY_FIRST.type()) val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search shard: $shard") + throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } return response.hits } @@ -681,7 +704,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor: Monitor, monitorMetadata: MonitorMetadata, index: String, - concreteIndex: String + concreteIndex: String, ): SearchHits { val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) @@ -722,32 +745,37 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return response.hits } - private fun getAllDocs( + /** Transform field names and index names in all the search hits to format required to run percolate search against them. + * Hits are transformed using method transformDocumentFieldNames() */ + private fun transformSearchHitsAndReconstructDocs( hits: SearchHits, index: String, concreteIndex: String, monitorId: String, - conflictingFields: List + conflictingFields: List, ): List> { - return hits.map { hit -> - val sourceMap = hit.sourceAsMap - - transformDocumentFieldNames( - sourceMap, - conflictingFields, - "_${index}_$monitorId", - "_${concreteIndex}_$monitorId", - "" - ) - - var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) - - val sourceRef = BytesReference.bytes(xContentBuilder) - - logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString()) - - Pair(hit.id, sourceRef) - } + return hits.mapNotNull(fun(hit: SearchHit): Pair? { + try { + val sourceMap = hit.sourceAsMap + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) + val sourceRef = BytesReference.bytes(xContentBuilder) + logger.debug( + "Monitor $monitorId: Document [${hit.id}] payload after transform for percolate query: ${sourceRef.utf8ToString()}", + ) + return Pair(hit.id, sourceRef) + } catch (e: Exception) { + logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) + // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. + return null + } + }) } /**