diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3ca2a9e24..3b036a382 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -101,18 +101,54 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } /** - * From given index mapping node, extracts fieldName -> fieldProperties pair + * Does a DFS traversal of index mappings tree. + * Calls processLeafFn on every leaf node. + * Populates flattenPaths list with full paths of leaf nodes + * @param node current node which we're visiting + * @param currentPath current node path from root node + * @param processLeafFn leaf processor function which is called on every leaf discovered + * @param flattenPaths list of full paths of all leaf nodes relative to root */ - fun extractField(node: MutableMap, currentPath: String): Pair> { + fun traverseMappingsAndUpdate( + node: MutableMap, + currentPath: String, + processLeafFn: (String, MutableMap) -> Triple>, + flattenPaths: MutableList + ) { + // If node contains "properties" property then it is internal(non-leaf) node if (node.containsKey(PROPERTIES)) { - return extractField(node.get(PROPERTIES) as MutableMap, currentPath) - } else if (node.containsKey(NESTED)) { - return extractField(node.get(NESTED) as MutableMap, currentPath) - } else if (node.size == 1 && node.containsKey(TYPE) == false) { - val iter = node.iterator().next() - return extractField(iter.value as MutableMap, currentPath + "." + iter.key) - } else { - return Pair(currentPath, node) + return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap, currentPath, processLeafFn, flattenPaths) + } else if (node.containsKey(TYPE) == false) { + // If there is no "type" property, this is either internal(non-leaf) node or leaf node + // newNodes will hold list of updated leaf properties + var newNodes = ArrayList>(node.size) + node.entries.forEach { + // Compute full path relative to root + val fullPath = if (currentPath.isEmpty()) it.key + else "$currentPath.${it.key}" + val nodeProps = it.value as MutableMap + // If it has type property and type is not "nested" then this is a leaf + if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { + // At this point we know full path of node, so we add it to output array + flattenPaths.add(fullPath) + // Calls processLeafFn and gets old node name, new node name and new properties of node. + // This is all information we need to update this node + val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) + newNodes.add(Triple(oldName, newName, props)) + } else { + // Internal(non-leaf) node - visit children + traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, fullPath, processLeafFn, flattenPaths) + } + } + // Here we can update all processed leaves in tree + newNodes.forEach { + // If we renamed leaf, we have to remove it first + if (it.first != it.second) { + node.remove(it.first) + } + // Put new properties of leaf + node.put(it.second, it.third) + } } } @@ -134,30 +170,39 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } val indices = getIndexResponse.indices() + // Run through each backing index and apply appropriate mappings to query index indices?.forEach { indexName -> if (clusterState.routingTable.hasIndex(indexName)) { val indexMetadata = clusterState.metadata.index(indexName) if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { val properties = ( (indexMetadata.mapping()?.sourceAsMap?.get("properties")) - as Map> + as MutableMap ) - - val updatedProperties = properties.entries.associate { - var (fieldName, fieldProps) = extractField(it.value as MutableMap, it.key) - val newProps = fieldProps - if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { - val mappingsByType = monitor.dataSources.queryIndexMappingsByType - if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { - mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newProps[iter.key] = iter.value + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, props: MutableMap): Triple> { + val newProps = props.toMutableMap() + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { + mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> + newProps[iter.key] = iter.value + } } } + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${indexName}_$monitorId" + } + return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps) } + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableListOf() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + // Updated mappings ready to be applied on queryIndex + val updatedProperties = properties - if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" - "${fieldName}_${indexName}_$monitorId" to newProps - } val queryIndex = monitor.dataSources.queryIndex val updateMappingRequest = PutMappingRequest(queryIndex) @@ -170,8 +215,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val indexRequests = mutableListOf() queries.forEach { var query = it.query - properties.forEach { prop -> - query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:") + flattenPaths.forEach { fieldPath -> + query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:") } val indexRequest = IndexRequest(queryIndex) .id(it.id + "_${indexName}_$monitorId") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index bf6a2549d..89579fa7e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -34,6 +34,7 @@ import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS +import java.util.Map import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -56,7 +57,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { }""" assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor - Assert.assertEquals(monitor.owner, "alerting") indexDoc(index, "1", testDoc) val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, true) @@ -130,37 +130,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } fun `test execute monitor with custom query index`() { - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") - val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5") + val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6") + val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5) + ) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" val customQueryIndex = "custom_alerts_index" var monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), triggers = listOf(trigger), - dataSources = DataSources(queryIndex = customQueryIndex) + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) ) val monitorResponse = createMonitor(monitor) val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters val testDoc = """{ "message" : "This is an error from IAD region", - "source.port": 12345, + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", "test_strict_date_time" : "$testTime", - "test_field" : "us-west-2" + "test_field.some_other_field" : "us-west-2" }""" indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size) + } + + fun `test execute monitor with custom query index and nested mappings`() { + val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + + // We are verifying here that index with nested mappings and nested aliases + // won't break query matching + + // Create index mappings + val m: MutableMap = HashMap() + val m1: MutableMap = HashMap() + m1["title"] = Map.of("type", "text") + m1["category"] = Map.of("type", "keyword") + m["rule"] = Map.of("type", "nested", "properties", m1) + val properties = Map.of("properties", m) + client().admin().indices().putMapping( PutMappingRequest( index - ).source("test_alias.field_a", "type=alias,path=message") + ).source(properties) ).get() + + // Put alias for nested fields + val mm: MutableMap = HashMap() + val mm1: MutableMap = HashMap() + mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title") + mm["rule"] = Map.of("type", "nested", "properties", mm1) + val properties1 = Map.of("properties", mm) client().admin().indices().putMapping( PutMappingRequest( index - ).source("test_alias2", "type=alias,path=test_field") + ).source(properties1) ).get() + val testDoc = """{ + "rule": {"title": "some_title"}, + "message": "msg 1 2 3 4" + }""" + indexDoc(index, "2", testDoc) + + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) @@ -172,11 +255,10 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { .get() Assert.assertTrue(getAlertsResponse != null) Assert.assertTrue(getAlertsResponse.alerts.size == 1) - getAlertsResponse = client() - .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null)) - .get() - Assert.assertTrue(getAlertsResponse != null) - Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) + assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size) } fun `test execute monitor with custom query index and custom field mappings`() {