diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5993dd929..84f737467 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,7 +8,6 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException -import org.opensearch.action.ActionListener import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction @@ -27,12 +26,12 @@ import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.AlertingPluginInterface @@ -47,8 +46,12 @@ import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.util.string +import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders @@ -70,7 +73,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - dryrun: Boolean + dryrun: Boolean, + workflowRunContext: WorkflowRunContext?, + executionId: String ): MonitorRunResult { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID @@ -96,7 +101,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata( monitor = monitor, createWithRunContext = false, - skipIndex = isTempMonitor + skipIndex = isTempMonitor, + workflowRunContext?.workflowMetadataId ) val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput @@ -114,11 +120,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { // Resolve all passed indices to concrete indices - val indices = IndexUtils.resolveAllIndices( + val concreteIndices = IndexUtils.resolveAllIndices( docLevelMonitorInput.indices, monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) + if (concreteIndices.isEmpty()) { + logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}") + throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) + } monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( @@ -130,64 +140,91 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // cleanup old indices that are not monitored anymore from the same monitor for (ind in updatedLastRunContext.keys) { - if (!indices.contains(ind)) { + if (!concreteIndices.contains(ind)) { updatedLastRunContext.remove(ind) } } - indices.forEach { indexName -> - // Prepare lastRunContext for each index - val indexLastRunContext = lastRunContext.getOrPut(indexName) { - val isIndexCreatedRecently = createdRecently( - monitor, - periodStart, - periodEnd, - monitorCtx.clusterService!!.state().metadata.index(indexName) - ) - MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently) - } + // Map of document ids per index when monitor is workflow delegate and has chained findings + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex - // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( - indexLastRunContext.toMutableMap(), - monitorCtx, - indexName - ) as MutableMap - updatedLastRunContext[indexName] = indexUpdatedRunContext - - val count: Int = indexLastRunContext["shards_count"] as Int - for (i: Int in 0 until count) { - val shard = i.toString() - - // update lastRunContext if its a temp monitor as we only want to view the last bit of data then - // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data - if (isTempMonitor) { - indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + docLevelMonitorInput.indices.forEach { indexName -> + val concreteIndices = IndexUtils.resolveAllIndices( + listOf(indexName), + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + val updatedIndexName = indexName.replace("*", "_") + val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( + monitorCtx.clusterService!!.state(), + concreteIndices + ) + + concreteIndices.forEach { concreteIndexName -> + // Prepare lastRunContext for each index + val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { + val isIndexCreatedRecently = createdRecently( + monitor, + periodStart, + periodEnd, + monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) + ) + MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently) } - } - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) + // Prepare updatedLastRunContext for each index + val indexUpdatedRunContext = updateLastRunContext( + indexLastRunContext.toMutableMap(), + monitorCtx, + concreteIndexName + ) as MutableMap + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + + val count: Int = indexLastRunContext["shards_count"] as Int + for (i: Int in 0 until count) { + val shard = i.toString() + + // update lastRunContext if its a temp monitor as we only want to view the last bit of data then + // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data + if (isTempMonitor) { + indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + } + } - val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName) + // Prepare DocumentExecutionContext for each index + val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( - monitorCtx, - matchingDocs.map { it.second }, + val matchingDocs = getMatchingDocs( monitor, - monitorMetadata, - indexName + monitorCtx, + docExecutionContext, + updatedIndexName, + concreteIndexName, + conflictingFields.toList(), + matchingDocIdsPerIndex?.get(concreteIndexName) ) - matchedQueriesForDocs.forEach { hit -> - val id = hit.id.replace("_${indexName}_${monitor.id}", "") - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$indexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries( + monitorCtx, + matchingDocs.map { it.second }, + monitor, + monitorMetadata, + updatedIndexName, + concreteIndexName + ) + + matchedQueriesForDocs.forEach { hit -> + val id = hit.id + .replace("_${updatedIndexName}_${monitor.id}", "") + .replace("_${concreteIndexName}_${monitor.id}", "") + + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + } } } } @@ -226,7 +263,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { idQueryMap, docsToQueries, queryToDocIds, - dryrun + dryrun, + executionId = executionId, + workflowRunContext = workflowRunContext ) } } @@ -235,7 +274,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If any error happened during trigger execution, upsert monitor error alert val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) if (errorMessage.isNotEmpty()) { - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage) + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, + executionId = executionId, + workflowRunContext + ) } else { onSuccessfulMonitorRun(monitorCtx, monitor) } @@ -250,7 +294,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return monitorResult.copy(triggerResults = triggerResults) } catch (e: Exception) { val errorMessage = ExceptionsHelper.detailedMessage(e) - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage) + monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext) logger.error("Failed running Document-level-monitor ${monitor.name}", e) val alertingException = AlertingException( errorMessage, @@ -298,7 +342,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { idQueryMap: Map, docsToQueries: Map>, queryToDocIds: Map>, - dryrun: Boolean + dryrun: Boolean, + workflowRunContext: WorkflowRunContext?, + executionId: String ): DocumentLevelTriggerRunResult { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) @@ -309,7 +355,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // TODO: Implement throttling for findings docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, !dryrun && monitor.id != Monitor.NO_ID) + val findingId = createFindings( + monitor, + monitorCtx, + triggeredQueries, + it.key, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) findings.add(findingId) if (triggerResult.triggeredDocs.contains(it.key)) { @@ -329,7 +382,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { listOf(it.first), listOf(it.second), triggerCtx, - monitorResult.alertError() ?: triggerResult.alertError() + monitorResult.alertError() ?: triggerResult.alertError(), + executionId = executionId, + workflorwRunContext = workflowRunContext ) alerts.add(alert) } @@ -369,7 +424,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { alert.copy(actionExecutionResults = actionExecutionResults) } - monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(monitor.dataSources, updatedAlerts, it) } + monitorCtx.retryPolicy?.let { + monitorCtx.alertService!!.saveAlerts( + monitor.dataSources, + updatedAlerts, + it, + routingId = monitor.id + ) + } } return triggerResult } @@ -379,7 +441,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, docLevelQueries: List, matchingDocId: String, - shouldCreateFinding: Boolean + shouldCreateFinding: Boolean, + workflowExecutionId: String? = null, ): String { // Before the "|" is the doc id and after the "|" is the index val docIndex = matchingDocId.split("|") @@ -392,7 +455,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorName = monitor.name, index = docIndex[1], docLevelQueries = docLevelQueries, - timestamp = Instant.now() + timestamp = Instant.now(), + executionId = workflowExecutionId ) val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() @@ -498,8 +562,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } - if (response.hits.hits.isEmpty()) + if (response.hits.hits.isEmpty()) { return -1L + } return response.hits.hits[0].seqNo } @@ -513,7 +578,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String + index: String, + concreteIndex: String, + conflictingFields: List, + docIds: List? = null ): List> { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int val matchingDocs = mutableListOf>() @@ -525,15 +593,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val hits: SearchHits = searchShard( monitorCtx, - index, + concreteIndex, shard, prevSeqNo, maxSeqNo, - null + null, + docIds ) if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, index, monitor.id)) + matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) } } catch (e: Exception) { logger.warn("Failed to run for shard $shard. Error: ${e.message}") @@ -548,7 +617,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String? + query: String?, + docIds: List? = null ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -560,6 +630,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) } + if (!docIds.isNullOrEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + } + val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -581,7 +655,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docs: List, monitor: Monitor, monitorMetadata: MonitorMetadata, - index: String + index: String, + concreteIndex: String ): SearchHits { val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) @@ -594,7 +669,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] if (queryIndex == null) { val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + - " sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}" + " sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}" logger.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) @@ -622,11 +697,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return response.hits } - private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List> { + private fun getAllDocs( + hits: SearchHits, + index: String, + concreteIndex: String, + monitorId: String, + conflictingFields: List + ): List> { return hits.map { hit -> val sourceMap = hit.sourceAsMap - transformDocumentFieldNames(sourceMap, "_${index}_$monitorId") + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) @@ -639,7 +726,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } /** - * Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names. + * Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names + * but different mappings & [fieldNameSuffixPattern] to field names which have unique names. * * Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV: * { { @@ -653,17 +741,36 @@ object DocumentLevelMonitorRunner : MonitorRunner() { */ private fun transformDocumentFieldNames( jsonAsMap: MutableMap, - fieldNameSuffix: String + conflictingFields: List, + fieldNameSuffixPattern: String, + fieldNameSuffixIndex: String, + fieldNamePrefix: String ) { val tempMap = mutableMapOf() val it: MutableIterator> = jsonAsMap.entries.iterator() while (it.hasNext()) { val entry = it.next() if (entry.value is Map<*, *>) { - transformDocumentFieldNames(entry.value as MutableMap, fieldNameSuffix) - } else if (entry.key.endsWith(fieldNameSuffix) == false) { - tempMap["${entry.key}$fieldNameSuffix"] = entry.value - it.remove() + transformDocumentFieldNames( + entry.value as MutableMap, + conflictingFields, + fieldNameSuffixPattern, + fieldNameSuffixIndex, + if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}" + ) + } else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) { + var alreadyReplaced = false + conflictingFields.forEach { conflictingField -> + if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) { + tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value + it.remove() + alreadyReplaced = true + } + } + if (!alreadyReplaced) { + tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value + it.remove() + } } } jsonAsMap.putAll(tempMap) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 9b008a081..e5ad5578c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.MonitorRunnerService.monitorCtx import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client +import org.opensearch.cluster.ClusterState import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -155,8 +156,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ fun traverseMappingsAndUpdate( node: MutableMap, currentPath: String, - processLeafFn: (String, MutableMap) -> Triple>, - flattenPaths: MutableList + processLeafFn: (String, String, MutableMap) -> Triple>, + flattenPaths: MutableMap> ) { // If node contains "properties" property then it is internal(non-leaf) node log.debug("Node in traverse: $node") @@ -170,10 +171,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // If it has type property and type is not "nested" then this is a leaf if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { // At this point we know full path of node, so we add it to output array - flattenPaths.add(fullPath) + flattenPaths.put(fullPath, nodeProps) // Calls processLeafFn and gets old node name, new node name and new properties of node. // This is all information we need to update this node - val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) + val (oldName, newName, props) = processLeafFn(it.key, fullPath, it.value as MutableMap) newNodes.add(Triple(oldName, newName, props)) } else { // Internal(non-leaf) node - visit children @@ -201,59 +202,105 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput val queries: List = docLevelMonitorInput.queries - val indices = IndexUtils.resolveAllIndices( - docLevelMonitorInput.indices, - monitorCtx.clusterService!!, - monitorCtx.indexNameExpressionResolver!! - ) - + val indices = docLevelMonitorInput.indices val clusterState = clusterService.state() // Run through each backing index and apply appropriate mappings to query index - indices?.forEach { indexName -> - if (clusterState.routingTable.hasIndex(indexName)) { - val indexMetadata = clusterState.metadata.index(indexName) - if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { - val properties = ( - (indexMetadata.mapping()?.sourceAsMap?.get("properties")) - as MutableMap - ) - // Node processor function is used to process leaves of index mappings tree - // - val leafNodeProcessor = - fun(fieldName: String, props: MutableMap): Triple> { - val newProps = props.toMutableMap() - if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { - val mappingsByType = monitor.dataSources.queryIndexMappingsByType - if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { - mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> - newProps[iter.key] = iter.value + indices.forEach { indexName -> + val concreteIndices = IndexUtils.resolveAllIndices( + listOf(indexName), + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + val updatedIndexName = indexName.replace("*", "_") + val updatedProperties = mutableMapOf() + val allFlattenPaths = mutableSetOf>() + var sourceIndexFieldLimit = 0L + val conflictingFields = getAllConflictingFields(clusterState, concreteIndices) + + concreteIndices.forEach { concreteIndexName -> + if (clusterState.routingTable.hasIndex(concreteIndexName)) { + val indexMetadata = clusterState.metadata.index(concreteIndexName) + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as MutableMap + ) + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, fullPath: String, props: MutableMap): + Triple> { + val newProps = props.toMutableMap() + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { + mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> + newProps[iter.key] = iter.value + } + } + } + + return if (conflictingFields.contains(fullPath)) { + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${concreteIndexName}_$monitorId" + } + Triple(fieldName, "${fieldName}_${concreteIndexName}_$monitorId", newProps) + } else { + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${updatedIndexName}_$monitorId" } + Triple(fieldName, "${fieldName}_${updatedIndexName}_$monitorId", newProps) } } - if (props.containsKey("path")) { - newProps["path"] = "${props["path"]}_${indexName}_$monitorId" + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableMapOf>() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + flattenPaths.keys.forEach { allFlattenPaths.add(Pair(it, concreteIndexName)) } + // Updated mappings ready to be applied on queryIndex + properties.forEach { + if ( + it.value is Map<*, *> && + (it.value as Map).containsKey("type") && + (it.value as Map)["type"] == NESTED + ) { + } else { + if (updatedProperties.containsKey(it.key) && updatedProperties[it.key] != it.value) { + val mergedField = mergeConflictingFields( + updatedProperties[it.key] as Map, + it.value as Map + ) + updatedProperties[it.key] = mergedField + } else { + updatedProperties[it.key] = it.value + } } - return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps) } - // Traverse and update index mappings here while extracting flatten field paths - val flattenPaths = mutableListOf() - traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) - // Updated mappings ready to be applied on queryIndex - val updatedProperties = properties - // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. - var (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( - monitor, - monitorMetadata, - indexName, - updatedProperties - ) - - if (updateMappingResponse.isAcknowledged) { - doIndexAllQueries(concreteQueryIndex, indexName, monitorId, queries, flattenPaths, refreshPolicy, indexTimeout) + sourceIndexFieldLimit += checkMaxFieldLimit(concreteIndexName) } } } + // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. + val (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( + monitor, + monitorMetadata, + updatedIndexName, + sourceIndexFieldLimit, + updatedProperties + ) + + if (updateMappingResponse.isAcknowledged) { + doIndexAllQueries( + concreteQueryIndex, + updatedIndexName, + monitorId, + queries, + allFlattenPaths, + conflictingFields, + refreshPolicy, + indexTimeout + ) + } } } @@ -262,18 +309,60 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ sourceIndex: String, monitorId: String, queries: List, - flattenPaths: MutableList, + flattenPaths: MutableSet>, + conflictingPaths: Set, refreshPolicy: RefreshPolicy, indexTimeout: TimeValue ) { val indexRequests = mutableListOf() + val conflictingPathToConcreteIndices = mutableMapOf>() + flattenPaths.forEach { fieldPath -> + if (conflictingPaths.contains(fieldPath.first)) { + if (conflictingPathToConcreteIndices.containsKey(fieldPath.first)) { + val concreteIndexSet = conflictingPathToConcreteIndices[fieldPath.first] + concreteIndexSet!!.add(fieldPath.second) + conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet + } else { + val concreteIndexSet = mutableSetOf() + concreteIndexSet.add(fieldPath.second) + conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet + } + } + } + + val newQueries = mutableListOf() queries.forEach { + val filteredConcreteIndices = mutableSetOf() + var query = it.query + conflictingPaths.forEach { conflictingPath -> + if (query.contains(conflictingPath)) { + query = query.replace("$conflictingPath:", "${conflictingPath}__$monitorId:") + filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!) + } + } + + if (filteredConcreteIndices.isNotEmpty()) { + filteredConcreteIndices.forEach { filteredConcreteIndex -> + val newQuery = it.copy( + id = "${it.id}_$filteredConcreteIndex", + query = query.replace("", filteredConcreteIndex) + ) + newQueries.add(newQuery) + } + } else { + newQueries.add(it.copy(id = "${it.id}_$sourceIndex")) + } + } + + newQueries.forEach { var query = it.query flattenPaths.forEach { fieldPath -> - query = query.replace("$fieldPath:", "${fieldPath}_${sourceIndex}_$monitorId:") + if (!conflictingPaths.contains(fieldPath.first)) { + query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:") + } } val indexRequest = IndexRequest(concreteQueryIndex) - .id(it.id + "_${sourceIndex}_$monitorId") + .id(it.id + "_$monitorId") .source( mapOf( "query" to mapOf("query_string" to mapOf("query" to query)), @@ -303,6 +392,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ monitor: Monitor, monitorMetadata: MonitorMetadata, sourceIndex: String, + sourceIndexFieldLimit: Long, updatedProperties: MutableMap ): Pair { var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] @@ -325,7 +415,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ var updateMappingResponse = AcknowledgedResponse(false) try { // Adjust max field limit in mappings for query index, if needed. - checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) + adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit, targetQueryIndex) updateMappingResponse = client.suspendUntil { client.admin().indices().putMapping(updateMappingRequest, it) } @@ -339,7 +429,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // Do queryIndex rollover targetQueryIndex = rolloverQueryIndex(monitor) // Adjust max field limit in mappings for new index. - checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) + adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit, targetQueryIndex) // PUT mappings to newly created index val updateMappingRequest = PutMappingRequest(targetQueryIndex) updateMappingRequest.source(mapOf("properties" to updatedProperties)) @@ -379,26 +469,96 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return Pair(updateMappingResponse, targetQueryIndex) } + /** + * merge conflicting leaf fields in the mapping tree + */ + private fun mergeConflictingFields(oldField: Map, newField: Map): Map { + val mergedField = mutableMapOf() + oldField.entries.forEach { + if (newField.containsKey(it.key)) { + if (it.value is Map<*, *> && newField[it.key] is Map<*, *>) { + mergedField[it.key] = + mergeConflictingFields(it.value as Map, newField[it.key] as Map) + } else { + mergedField[it.key] = it.value + } + } else { + mergedField[it.key] = it.value + } + } + + newField.entries.forEach { + if (!oldField.containsKey(it.key)) { + mergedField[it.key] = it.value + } + } + return mergedField + } + + /** + * get all fields which have same name but different mappings belonging to an index pattern + */ + fun getAllConflictingFields(clusterState: ClusterState, concreteIndices: List): Set { + val conflictingFields = mutableSetOf() + val allFlattenPaths = mutableMapOf>() + concreteIndices.forEach { concreteIndexName -> + if (clusterState.routingTable.hasIndex(concreteIndexName)) { + val indexMetadata = clusterState.metadata.index(concreteIndexName) + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as MutableMap + ) + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, _: String, props: MutableMap): Triple> { + return Triple(fieldName, fieldName, props) + } + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableMapOf>() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + + flattenPaths.forEach { + if (allFlattenPaths.containsKey(it.key) && allFlattenPaths[it.key]!! != it.value) { + conflictingFields.add(it.key) + } + allFlattenPaths.putIfAbsent(it.key, it.value) + } + } + } + } + return conflictingFields + } + + /** + * checks the max field limit for a concrete index + */ + private suspend fun checkMaxFieldLimit(sourceIndex: String): Long { + val getSettingsResponse: GetSettingsResponse = client.suspendUntil { + admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex), it) + } + return getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L + } + /** * Adjusts max field limit index setting for query index if source index has higher limit. * This will prevent max field limit exception, when source index has more fields then query index limit */ - private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) { + private suspend fun adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit: Long, concreteQueryIndex: String) { val getSettingsResponse: GetSettingsResponse = client.suspendUntil { - admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex, concreteQueryIndex), it) + admin().indices().getSettings(GetSettingsRequest().indices(concreteQueryIndex), it) } - val sourceIndexLimit = - getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L val queryIndexLimit = getSettingsResponse.getSetting(concreteQueryIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L // Our query index initially has 3 fields we defined and 5 more builtin metadata fields in mappings so we have to account for that - if (sourceIndexLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) { + if (sourceIndexFieldLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) { val updateSettingsResponse: AcknowledgedResponse = client.suspendUntil { admin().indices().updateSettings( UpdateSettingsRequest(concreteQueryIndex).settings( Settings.builder().put( INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, - sourceIndexLimit + QUERY_INDEX_BASE_FIELDS_COUNT + sourceIndexFieldLimit + QUERY_INDEX_BASE_FIELDS_COUNT ) ), it diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 9edf19f08..ae312baf4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -21,7 +21,6 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.util.IndexUtils import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser -import org.opensearch.index.IndexNotFoundException class IndexUtils { @@ -150,10 +149,6 @@ class IndexUtils { result.addAll(concreteIndices) } - if (result.size == 0) { - throw IndexNotFoundException(indices[0]) - } - return result } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 51027a6b1..c8ab62933 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -5,10 +5,14 @@ package org.opensearch.alerting +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN import org.opensearch.client.Response import org.opensearch.client.ResponseException +import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput @@ -17,6 +21,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.model.action.PerExecutionActionScope +import org.opensearch.core.rest.RestStatus import org.opensearch.script.Script import java.time.ZonedDateTime import java.time.format.DateTimeFormatter @@ -495,6 +500,534 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size) } + fun `test execute monitor with indices having fields with same name but different data types`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source.device.port": { "type": "long" }, + "source.device.hwd.id": { "type": "long" }, + "nested_field": { + "type": "nested", + "properties": { + "test1": { + "type": "keyword" + } + } + }, + "my_join_field": { + "type": "join", + "relations": { + "question": "answer" + } + }, + "test_field" : { "type" : "integer" } + } + """.trimIndent() + ) + var testDoc = """{ + "source" : { "device": {"port" : 12345 } }, + "nested_field": { "test1": "some text" }, + "test_field": 12345 + }""" + + val docQuery1 = DocLevelQuery( + query = "(source.device.port:12345 AND test_field:12345) OR source.device.hwd.id:12345", + name = "4" + ) + val docQuery2 = DocLevelQuery( + query = "(source.device.port:\"12345\" AND test_field:\"12345\") OR source.device.hwd.id:\"12345\"", + name = "5" + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + executeMonitor(monitor.id) + + var alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 1, alerts.size) + + var findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + + // clear previous findings and alerts + deleteIndex(ALL_FINDING_INDEX_PATTERN) + deleteIndex(ALL_ALERT_INDEX_PATTERN) + + indexDoc(testIndex, "2", testDoc) + + // no fields expanded as only index test1 is present + val oldExpectedQueries = listOf( + "(source.device.port_test__${monitor.id}:12345 AND test_field_test__${monitor.id}:12345) OR " + + "source.device.hwd.id_test__${monitor.id}:12345", + "(source.device.port_test__${monitor.id}:\"12345\" AND test_field_test__${monitor.id}:\"12345\") " + + "OR source.device.hwd.id_test__${monitor.id}:\"12345\"" + ) + + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(oldExpectedQueries.contains(query)) + } + + val testIndex2 = createTestIndex( + "test2", + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + ) + testDoc = """{ + "source" : { "device": {"port" : "12345" } }, + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + indexDoc(testIndex2, "1", testDoc) + executeMonitor(monitor.id) + + // only fields source.device.port & test_field is expanded as they have same name but different data types + // in indices test1 & test2 + val newExpectedQueries = listOf( + "(source.device.port_test2_${monitor.id}:12345 AND test_field_test2_${monitor.id}:12345) " + + "OR source.device.hwd.id_test__${monitor.id}:12345", + "(source.device.port_test1_${monitor.id}:12345 AND test_field_test1_${monitor.id}:12345) " + + "OR source.device.hwd.id_test__${monitor.id}:12345", + "(source.device.port_test2_${monitor.id}:\"12345\" AND test_field_test2_${monitor.id}:\"12345\") " + + "OR source.device.hwd.id_test__${monitor.id}:\"12345\"", + "(source.device.port_test1_${monitor.id}:\"12345\" AND test_field_test1_${monitor.id}:\"12345\") " + + "OR source.device.hwd.id_test__${monitor.id}:\"12345\"" + ) + + alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(oldExpectedQueries.contains(query) || newExpectedQueries.contains(query)) + } + } + + fun `test execute monitor with indices having fields with same name but with different nesting`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "nested_field": { + "type": "nested", + "properties": { + "test1": { + "type": "keyword" + } + } + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "nested_field": { + "properties": { + "test1": { + "type": "keyword" + } + } + } + } + """.trimIndent() + ) + val testDoc = """{ + "nested_field": { "test1": "12345" } + }""" + + val docQuery = DocLevelQuery( + query = "nested_field.test1:\"12345\"", + name = "5" + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "1", testDoc) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + // as mappings of source.id & test_field are different so, both of them expands + val expectedQueries = listOf( + "nested_field.test1_test__${monitor.id}:\"12345\"" + ) + + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + } + + fun `test execute monitor with indices having fields with same name but different field mappings`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + }, + "test_field" : { + "type":"text", + "analyzer":"whitespace" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text" + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + val testDoc = """{ + "source" : {"id" : "12345" }, + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + + val docQuery = DocLevelQuery( + query = "test_field:\"12345\" AND source.id:\"12345\"", + name = "5" + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "1", testDoc) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + // as mappings of source.id & test_field are different so, both of them expands + val expectedQueries = listOf( + "test_field_test2_${monitor.id}:\"12345\" AND source.id_test2_${monitor.id}:\"12345\"", + "test_field_test1_${monitor.id}:\"12345\" AND source.id_test1_${monitor.id}:\"12345\"" + ) + + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + } + + fun `test execute monitor with indices having fields with same name but different field mappings in multiple indices`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "test_field" : { + "type":"keyword" + } + } + """.trimIndent() + ) + + val testIndex4 = createTestIndex( + "test4", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testDoc1 = """{ + "source" : {"device" : {"hwd" : {"id" : "12345"}} }, + "nested_field": { "test1": "some text" } + }""" + val testDoc2 = """{ + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + + val docQuery1 = DocLevelQuery( + query = "test_field:\"12345\"", + name = "4" + ) + val docQuery2 = DocLevelQuery( + query = "source.device.hwd.id:\"12345\"", + name = "5" + ) + + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex4, "1", testDoc1) + indexDoc(testIndex2, "1", testDoc2) + indexDoc(testIndex, "1", testDoc1) + indexDoc(testIndex, "2", testDoc2) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 4, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 4, findings.size) + + val request = """{ + "size": 0, + "query": { + "match_all": {} + } + }""" + val httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(5L, it.value) } + } + + fun `test no of queries generated for document-level monitor based on wildcard indexes`() { + val testIndex = createTestIndex("test1") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + executeMonitor(monitor.id) + + val request = """{ + "size": 0, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(1L, it.value) } + + val testIndex2 = createTestIndex("test2") + indexDoc(testIndex2, "1", testDoc) + executeMonitor(monitor.id) + + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(1L, it.value) } + } + + fun `test execute monitor with new index added after first execution that generates alerts and findings from new query`() { + val testIndex = createTestIndex("test1") + val testIndex2 = createTestIndex("test2") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery2 = DocLevelQuery(query = "test_field_new:\"us-west-2\"", name = "4") + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "5", testDoc) + executeMonitor(monitor.id) + + var alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + var findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } + assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size) + + // clear previous findings and alerts + deleteIndex(ALL_FINDING_INDEX_PATTERN) + deleteIndex(ALL_ALERT_INDEX_PATTERN) + + val testDocNew = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field_new" : "us-west-2" + }""" + + val testIndex3 = createTestIndex("test3") + indexDoc(testIndex3, "10", testDocNew) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery2.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("10|$testIndex3"))) + + alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 1, alerts.size) + + findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + + foundFindings = findings.filter { + it.relatedDocIds.contains("10") + } + assertEquals("Findings saved for test monitor expected 10", 1, foundFindings.size) + } + fun `test document-level monitor when alias only has write index with 0 docs`() { // Monitor should execute, but create 0 findings. val alias = createTestAlias(includeWriteIndex = true)