From 34b723010b8a8d177845144b2d1cb53521b09eb8 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 4 Nov 2022 18:45:59 +0100 Subject: [PATCH 1/9] example Signed-off-by: Petar Dzepina --- .../org/opensearch/alerting/MonitorDataSourcesIT.kt | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index f144a4215..c61bd1360 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest @@ -138,12 +139,19 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ "message" : "This is an error from IAD region", + "source.port": 12345, "test_strict_date_time" : "$testTime", "test_field" : "us-west-2" }""" + indexDoc(index, "1", testDoc) assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor - indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias.field_a", "type=alias,path=message") + ).get() + val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) From 48a596b817d7fd775f12ad1e306673582c999a2d Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 4 Nov 2022 20:33:21 +0100 Subject: [PATCH 2/9] fixed updating mappings for queryIndex Signed-off-by: Petar Dzepina --- .../alerting/util/DocLevelMonitorQueries.kt | 31 ++++++++++++++++--- .../alerting/MonitorDataSourcesIT.kt | 5 +++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 98693def1..3ca2a9e24 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) { companion object { + + val PROPERTIES = "properties" + val NESTED = "nested" + val TYPE = "type" + @JvmStatic fun docLevelQueriesMappings(): String { return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() @@ -95,6 +100,22 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) } + /** + * From given index mapping node, extracts fieldName -> fieldProperties pair + */ + fun extractField(node: MutableMap, currentPath: String): Pair> { + if (node.containsKey(PROPERTIES)) { + return extractField(node.get(PROPERTIES) as MutableMap, currentPath) + } else if (node.containsKey(NESTED)) { + return extractField(node.get(NESTED) as MutableMap, currentPath) + } else if (node.size == 1 && node.containsKey(TYPE) == false) { + val iter = node.iterator().next() + return extractField(iter.value as MutableMap, currentPath + "." + iter.key) + } else { + return Pair(currentPath, node) + } + } + suspend fun indexDocLevelQueries( monitor: Monitor, monitorId: String, @@ -123,17 +144,19 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) val updatedProperties = properties.entries.associate { - val newVal = it.value.toMutableMap() + var (fieldName, fieldProps) = extractField(it.value as MutableMap, it.key) + val newProps = fieldProps if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { val mappingsByType = monitor.dataSources.queryIndexMappingsByType if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newVal[iter.key] = iter.value + newProps[iter.key] = iter.value } } } - if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId" - "${it.key}_${indexName}_$monitorId" to newVal + + if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" + "${fieldName}_${indexName}_$monitorId" to newProps } val queryIndex = monitor.dataSources.queryIndex diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index c61bd1360..0b283101e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -151,6 +151,11 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { index ).source("test_alias.field_a", "type=alias,path=message") ).get() + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias2", "type=alias,path=test_field") + ).get() val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) From 9eeb91dc3145086d9a56d062e66439a91fd8b004 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 8 Nov 2022 19:47:45 +0100 Subject: [PATCH 3/9] bug fix to include nested props Signed-off-by: Petar Dzepina --- .../alerting/util/DocLevelMonitorQueries.kt | 95 +++++++++++++++---- .../alerting/MonitorDataSourcesIT.kt | 4 +- 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3ca2a9e24..7e3a9b736 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -103,19 +103,74 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ /** * From given index mapping node, extracts fieldName -> fieldProperties pair */ - fun extractField(node: MutableMap, currentPath: String): Pair> { + fun extractField( + node: MutableMap, + parent: MutableMap?, + currentPath: String + ): Pair> { if (node.containsKey(PROPERTIES)) { - return extractField(node.get(PROPERTIES) as MutableMap, currentPath) + return extractField(node.get(PROPERTIES) as MutableMap, null, currentPath) } else if (node.containsKey(NESTED)) { - return extractField(node.get(NESTED) as MutableMap, currentPath) - } else if (node.size == 1 && node.containsKey(TYPE) == false) { + return extractField(node.get(NESTED) as MutableMap, null, currentPath) + } else if (node.containsKey(TYPE) == false) { val iter = node.iterator().next() - return extractField(iter.value as MutableMap, currentPath + "." + iter.key) + return extractField(iter.value as MutableMap, node, currentPath + "." + iter.key) } else { return Pair(currentPath, node) } } + fun traverseMappingsAndUpdate( + inNode: MutableMap, + currentPath: String, + processNodeFn: (String, MutableMap) -> Triple> + ) { + if (inNode.containsKey(PROPERTIES)) { + return traverseMappingsAndUpdate(inNode.get(PROPERTIES) as MutableMap, currentPath, processNodeFn) + } else if (inNode.containsKey(TYPE) && inNode[TYPE] == NESTED) { + return traverseMappingsAndUpdate(inNode.get(NESTED) as MutableMap, currentPath, processNodeFn) + } else if (inNode.containsKey(TYPE) == false) { + var newNodes = ArrayList>(inNode.size) + inNode.entries.forEach { + val nodeProps = it.value as MutableMap + if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { + val (oldName, newName, props) = processNodeFn(it.key, it.value as MutableMap) + newNodes.add(Triple(oldName, newName, props)) + } else { + traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, it.key, processNodeFn) + } + } + newNodes.forEach { + if (it.first != it.second) { + inNode.remove(it.first) + } + inNode.put(it.second, it.third) + } + } + } + + fun processNode( + fieldName: String, + props: MutableMap, + ): Triple> { + val newProps = LinkedHashMap(props) + var newFieldName = fieldName +/* + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { + mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> + newProps[iter.key] = iter.value + } + } + } + + if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" + "${fieldName}_${indexName}_$monitorId" to newProps +*/ + return Triple(fieldName, newFieldName, newProps) + } + suspend fun indexDocLevelQueries( monitor: Monitor, monitorId: String, @@ -140,24 +195,30 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { val properties = ( (indexMetadata.mapping()?.sourceAsMap?.get("properties")) - as Map> + as MutableMap ) - val updatedProperties = properties.entries.associate { - var (fieldName, fieldProps) = extractField(it.value as MutableMap, it.key) - val newProps = fieldProps - if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { - val mappingsByType = monitor.dataSources.queryIndexMappingsByType - if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { - mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newProps[iter.key] = iter.value + val nodeProcessor = + fun(fieldName: String, props: MutableMap): Triple> { + var newFieldName = fieldName + val newProps = props.toMutableMap() + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { + mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> + newProps[iter.key] = iter.value + } } } + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${indexName}_$monitorId" + } + return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps) } + traverseMappingsAndUpdate(properties, "", nodeProcessor) + + val updatedProperties = properties - if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" - "${fieldName}_${indexName}_$monitorId" to newProps - } val queryIndex = monitor.dataSources.queryIndex val updateMappingRequest = PutMappingRequest(queryIndex) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 0b283101e..278ac2fcf 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -140,6 +140,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val testDoc = """{ "message" : "This is an error from IAD region", "source.port": 12345, + "source.ip": 12345, + "source.ipv6": 12345, "test_strict_date_time" : "$testTime", "test_field" : "us-west-2" }""" @@ -149,7 +151,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { client().admin().indices().putMapping( PutMappingRequest( index - ).source("test_alias.field_a", "type=alias,path=message") + ).source("test_alias.field_a", "type=alias,path=source.port") ).get() client().admin().indices().putMapping( PutMappingRequest( From 70051c9f4046399befbcf3c24f5e691f6ed1a496 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 9 Nov 2022 03:08:26 +0100 Subject: [PATCH 4/9] fixed bugs with traversal Signed-off-by: Petar Dzepina --- .../alerting/util/DocLevelMonitorQueries.kt | 82 ++++--------- .../alerting/MonitorDataSourcesIT.kt | 109 +++++++++++++++--- 2 files changed, 117 insertions(+), 74 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 7e3a9b736..435abe53b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -100,77 +100,37 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) } - /** - * From given index mapping node, extracts fieldName -> fieldProperties pair - */ - fun extractField( - node: MutableMap, - parent: MutableMap?, - currentPath: String - ): Pair> { - if (node.containsKey(PROPERTIES)) { - return extractField(node.get(PROPERTIES) as MutableMap, null, currentPath) - } else if (node.containsKey(NESTED)) { - return extractField(node.get(NESTED) as MutableMap, null, currentPath) - } else if (node.containsKey(TYPE) == false) { - val iter = node.iterator().next() - return extractField(iter.value as MutableMap, node, currentPath + "." + iter.key) - } else { - return Pair(currentPath, node) - } - } - fun traverseMappingsAndUpdate( - inNode: MutableMap, + node: MutableMap, currentPath: String, - processNodeFn: (String, MutableMap) -> Triple> + processNodeFn: (String, MutableMap) -> Triple>, + flattenPaths: MutableList ) { - if (inNode.containsKey(PROPERTIES)) { - return traverseMappingsAndUpdate(inNode.get(PROPERTIES) as MutableMap, currentPath, processNodeFn) - } else if (inNode.containsKey(TYPE) && inNode[TYPE] == NESTED) { - return traverseMappingsAndUpdate(inNode.get(NESTED) as MutableMap, currentPath, processNodeFn) - } else if (inNode.containsKey(TYPE) == false) { - var newNodes = ArrayList>(inNode.size) - inNode.entries.forEach { + if (node.containsKey(PROPERTIES)) { + return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap, currentPath, processNodeFn, flattenPaths) + } else if (node.containsKey(TYPE) == false) { + var newNodes = ArrayList>(node.size) + node.entries.forEach { + val fullPath = if (currentPath.isEmpty()) it.key + else "$currentPath.${it.key}" val nodeProps = it.value as MutableMap if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { + flattenPaths.add(fullPath) val (oldName, newName, props) = processNodeFn(it.key, it.value as MutableMap) newNodes.add(Triple(oldName, newName, props)) } else { - traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, it.key, processNodeFn) + traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, fullPath, processNodeFn, flattenPaths) } } newNodes.forEach { if (it.first != it.second) { - inNode.remove(it.first) + node.remove(it.first) } - inNode.put(it.second, it.third) + node.put(it.second, it.third) } } } - fun processNode( - fieldName: String, - props: MutableMap, - ): Triple> { - val newProps = LinkedHashMap(props) - var newFieldName = fieldName -/* - if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { - val mappingsByType = monitor.dataSources.queryIndexMappingsByType - if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { - mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newProps[iter.key] = iter.value - } - } - } - - if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" - "${fieldName}_${indexName}_$monitorId" to newProps -*/ - return Triple(fieldName, newFieldName, newProps) - } - suspend fun indexDocLevelQueries( monitor: Monitor, monitorId: String, @@ -189,6 +149,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } val indices = getIndexResponse.indices() + // Run through each backing index and apply appropriate mappings to query index indices?.forEach { indexName -> if (clusterState.routingTable.hasIndex(indexName)) { val indexMetadata = clusterState.metadata.index(indexName) @@ -197,10 +158,9 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ (indexMetadata.mapping()?.sourceAsMap?.get("properties")) as MutableMap ) - + // Node processor function is used to process leaves of index mappings tree val nodeProcessor = fun(fieldName: String, props: MutableMap): Triple> { - var newFieldName = fieldName val newProps = props.toMutableMap() if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { val mappingsByType = monitor.dataSources.queryIndexMappingsByType @@ -215,8 +175,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps) } - traverseMappingsAndUpdate(properties, "", nodeProcessor) - + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableListOf() + traverseMappingsAndUpdate(properties, "", nodeProcessor, flattenPaths) + // Updated mappings ready to be applied on queryIndex val updatedProperties = properties val queryIndex = monitor.dataSources.queryIndex @@ -231,8 +193,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val indexRequests = mutableListOf() queries.forEach { var query = it.query - properties.forEach { prop -> - query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:") + flattenPaths.forEach { fieldPath -> + query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:") } val indexRequest = IndexRequest(queryIndex) .id(it.id + "_${indexName}_$monitorId") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 278ac2fcf..0ec75583d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -31,6 +31,7 @@ import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS +import java.util.Map import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -126,39 +127,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } fun `test execute monitor with custom query index`() { - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") - val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5") + val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6") + val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5) + ) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" val customQueryIndex = "custom_alerts_index" var monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), triggers = listOf(trigger), - dataSources = DataSources(queryIndex = customQueryIndex) + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) ) val monitorResponse = createMonitor(monitor) val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters val testDoc = """{ "message" : "This is an error from IAD region", - "source.port": 12345, - "source.ip": 12345, - "source.ipv6": 12345, + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", "test_strict_date_time" : "$testTime", - "test_field" : "us-west-2" + "test_field.some_other_field" : "us-west-2" }""" indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size) + } + + fun `test execute monitor with custom query index and nested mappings`() { + val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + + // We are verifying here that index with nested mappings and nested aliases + // won't break query matching + + // Create index mappings + val m: MutableMap = HashMap() + val m1: MutableMap = HashMap() + m1["title"] = Map.of("type", "text") + m1["category"] = Map.of("type", "keyword") + m["rule"] = Map.of("type", "nested", "properties", m1) + val properties = Map.of("properties", m) + client().admin().indices().putMapping( PutMappingRequest( index - ).source("test_alias.field_a", "type=alias,path=source.port") + ).source(properties) ).get() + + // Put alias for nested fields + val mm: MutableMap = HashMap() + val mm1: MutableMap = HashMap() + mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title") + mm["rule"] = Map.of("type", "nested", "properties", mm1) + val properties1 = Map.of("properties", mm) client().admin().indices().putMapping( PutMappingRequest( index - ).source("test_alias2", "type=alias,path=test_field") + ).source(properties1) ).get() + val testDoc = """{ + "rule": {"title": "some_title"}, + "message": "msg 1 2 3 4" + }""" + indexDoc(index, "2", testDoc) + + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) @@ -170,11 +252,10 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { .get() Assert.assertTrue(getAlertsResponse != null) Assert.assertTrue(getAlertsResponse.alerts.size == 1) - getAlertsResponse = client() - .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null)) - .get() - Assert.assertTrue(getAlertsResponse != null) - Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) + assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size) } fun `test execute monitor with custom query index and custom field mappings`() { From 599d40aadc6a915de407305eb0bc5df182ad12b0 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 9 Nov 2022 03:24:07 +0100 Subject: [PATCH 5/9] empty commit Signed-off-by: Petar Dzepina From 3d965d8aa909c31e93aa523af31579458852422e Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 9 Nov 2022 15:54:01 +0100 Subject: [PATCH 6/9] empty commit Signed-off-by: Petar Dzepina From fa40bd4dbef3bbd8c0ca1e7a1a203245b84d71ae Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 9 Nov 2022 15:55:31 +0100 Subject: [PATCH 7/9] renamed nodeProcessor to leafNodeProcessor for more clarity Signed-off-by: Petar Dzepina --- .../alerting/util/DocLevelMonitorQueries.kt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 435abe53b..66a3d35b5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -103,11 +103,11 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ fun traverseMappingsAndUpdate( node: MutableMap, currentPath: String, - processNodeFn: (String, MutableMap) -> Triple>, + processLeafFn: (String, MutableMap) -> Triple>, flattenPaths: MutableList ) { if (node.containsKey(PROPERTIES)) { - return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap, currentPath, processNodeFn, flattenPaths) + return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap, currentPath, processLeafFn, flattenPaths) } else if (node.containsKey(TYPE) == false) { var newNodes = ArrayList>(node.size) node.entries.forEach { @@ -116,10 +116,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val nodeProps = it.value as MutableMap if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { flattenPaths.add(fullPath) - val (oldName, newName, props) = processNodeFn(it.key, it.value as MutableMap) + val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) newNodes.add(Triple(oldName, newName, props)) } else { - traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, fullPath, processNodeFn, flattenPaths) + traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, fullPath, processLeafFn, flattenPaths) } } newNodes.forEach { @@ -159,7 +159,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ as MutableMap ) // Node processor function is used to process leaves of index mappings tree - val nodeProcessor = + val leafNodeProcessor = fun(fieldName: String, props: MutableMap): Triple> { val newProps = props.toMutableMap() if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { @@ -177,7 +177,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } // Traverse and update index mappings here while extracting flatten field paths val flattenPaths = mutableListOf() - traverseMappingsAndUpdate(properties, "", nodeProcessor, flattenPaths) + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) // Updated mappings ready to be applied on queryIndex val updatedProperties = properties From fe6e60dddbd8e6a512ac15c38a11ebf9e5ac767f Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 9 Nov 2022 20:39:30 +0100 Subject: [PATCH 8/9] added comments Signed-off-by: Petar Dzepina --- .../alerting/util/DocLevelMonitorQueries.kt | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 66a3d35b5..2d4edc65b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -100,32 +100,53 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) } + /** + * Does a DFS traversal of index mappings tree. + * Calls processLeafFn on every leaf node. + * Populates flattenPaths list with full paths of leaf nodes + * @param node current node which we're visiting + * @param currentPath current node path from root node + * @param processLeafFn leaf processor function which is called on every leaf discovered + * @param flattenPaths list of full paths of all leaf nodes relative to root + */ fun traverseMappingsAndUpdate( node: MutableMap, currentPath: String, processLeafFn: (String, MutableMap) -> Triple>, flattenPaths: MutableList ) { + // If node contains "properties" property then it is internal(non-leaf) node if (node.containsKey(PROPERTIES)) { return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap, currentPath, processLeafFn, flattenPaths) + // If there is no "type" property, this is either internal(non-leaf) node or leaf node } else if (node.containsKey(TYPE) == false) { + // newNodes will hold list of updated leaf properties var newNodes = ArrayList>(node.size) node.entries.forEach { + // Compute full path relative to root val fullPath = if (currentPath.isEmpty()) it.key else "$currentPath.${it.key}" val nodeProps = it.value as MutableMap + // If it has type property and type is not "nested" then this is a leaf if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { + // At this point we know full path of node, so we add it to output array flattenPaths.add(fullPath) + // Calls processLeafFn and gets old node name, new node name and new properties of node. + // This is all information we need to update this node val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) newNodes.add(Triple(oldName, newName, props)) + // Internal(non-leaf) node - visit children } else { traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, fullPath, processLeafFn, flattenPaths) } } + // Here we can update all processed leaves in tree newNodes.forEach { + // If we renamed leaf, we have to remove it first if (it.first != it.second) { node.remove(it.first) } + // Put new properties of leaf node.put(it.second, it.third) } } @@ -159,6 +180,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ as MutableMap ) // Node processor function is used to process leaves of index mappings tree + // val leafNodeProcessor = fun(fieldName: String, props: MutableMap): Triple> { val newProps = props.toMutableMap() From f75899f67072470bad669c5e0ec03e57e3c97245 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 9 Nov 2022 20:43:27 +0100 Subject: [PATCH 9/9] ktlint fixes Signed-off-by: Petar Dzepina --- .../org/opensearch/alerting/util/DocLevelMonitorQueries.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 2d4edc65b..3b036a382 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -118,8 +118,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // If node contains "properties" property then it is internal(non-leaf) node if (node.containsKey(PROPERTIES)) { return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap, currentPath, processLeafFn, flattenPaths) - // If there is no "type" property, this is either internal(non-leaf) node or leaf node } else if (node.containsKey(TYPE) == false) { + // If there is no "type" property, this is either internal(non-leaf) node or leaf node // newNodes will hold list of updated leaf properties var newNodes = ArrayList>(node.size) node.entries.forEach { @@ -135,8 +135,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // This is all information we need to update this node val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) newNodes.add(Triple(oldName, newName, props)) - // Internal(non-leaf) node - visit children } else { + // Internal(non-leaf) node - visit children traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, fullPath, processLeafFn, flattenPaths) } }