Skip to content

Commit

Permalink
mappings traversal bug fix (opensearch-project#669)
Browse files Browse the repository at this point in the history

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz authored and eirsep committed Nov 9, 2022
1 parent 51712a2 commit b03a5ee
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,54 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}

/**
* From given index mapping node, extracts fieldName -> fieldProperties pair
* Does a DFS traversal of index mappings tree.
* Calls processLeafFn on every leaf node.
* Populates flattenPaths list with full paths of leaf nodes
* @param node current node which we're visiting
* @param currentPath current node path from root node
* @param processLeafFn leaf processor function which is called on every leaf discovered
* @param flattenPaths list of full paths of all leaf nodes relative to root
*/
fun extractField(node: MutableMap<String, Any>, currentPath: String): Pair<String, MutableMap<String, Any>> {
fun traverseMappingsAndUpdate(
node: MutableMap<String, Any>,
currentPath: String,
processLeafFn: (String, MutableMap<String, Any>) -> Triple<String, String, MutableMap<String, Any>>,
flattenPaths: MutableList<String>
) {
// If node contains "properties" property then it is internal(non-leaf) node
if (node.containsKey(PROPERTIES)) {
return extractField(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath)
} else if (node.containsKey(NESTED)) {
return extractField(node.get(NESTED) as MutableMap<String, Any>, currentPath)
} else if (node.size == 1 && node.containsKey(TYPE) == false) {
val iter = node.iterator().next()
return extractField(iter.value as MutableMap<String, Any>, currentPath + "." + iter.key)
} else {
return Pair(currentPath, node)
return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath, processLeafFn, flattenPaths)
} else if (node.containsKey(TYPE) == false) {
// If there is no "type" property, this is either internal(non-leaf) node or leaf node
// newNodes will hold list of updated leaf properties
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
node.entries.forEach {
// Compute full path relative to root
val fullPath = if (currentPath.isEmpty()) it.key
else "$currentPath.${it.key}"
val nodeProps = it.value as MutableMap<String, Any>
// If it has type property and type is not "nested" then this is a leaf
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
// At this point we know full path of node, so we add it to output array
flattenPaths.add(fullPath)
// Calls processLeafFn and gets old node name, new node name and new properties of node.
// This is all information we need to update this node
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
// Internal(non-leaf) node - visit children
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
}
}
// Here we can update all processed leaves in tree
newNodes.forEach {
// If we renamed leaf, we have to remove it first
if (it.first != it.second) {
node.remove(it.first)
}
// Put new properties of leaf
node.put(it.second, it.third)
}
}
}

Expand All @@ -134,30 +170,39 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
val indices = getIndexResponse.indices()

// Run through each backing index and apply appropriate mappings to query index
indices?.forEach { indexName ->
if (clusterState.routingTable.hasIndex(indexName)) {
val indexMetadata = clusterState.metadata.index(indexName)
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
val properties = (
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
as Map<String, Map<String, Any>>
as MutableMap<String, Any>
)

val updatedProperties = properties.entries.associate {
var (fieldName, fieldProps) = extractField(it.value as MutableMap<String, Any>, it.key)
val newProps = fieldProps
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) {
mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newProps[iter.key] = iter.value
// Node processor function is used to process leaves of index mappings tree
//
val leafNodeProcessor =
fun(fieldName: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
val newProps = props.toMutableMap()
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) {
mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newProps[iter.key] = iter.value
}
}
}
if (props.containsKey("path")) {
newProps["path"] = "${props["path"]}_${indexName}_$monitorId"
}
return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps)
}
// Traverse and update index mappings here while extracting flatten field paths
val flattenPaths = mutableListOf<String>()
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)
// Updated mappings ready to be applied on queryIndex
val updatedProperties = properties

if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId"
"${fieldName}_${indexName}_$monitorId" to newProps
}
val queryIndex = monitor.dataSources.queryIndex

val updateMappingRequest = PutMappingRequest(queryIndex)
Expand All @@ -170,8 +215,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
val indexRequests = mutableListOf<IndexRequest>()
queries.forEach {
var query = it.query
properties.forEach { prop ->
query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:")
flattenPaths.forEach { fieldPath ->
query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:")
}
val indexRequest = IndexRequest(queryIndex)
.id(it.id + "_${indexName}_$monitorId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Map
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

Expand Down Expand Up @@ -129,37 +130,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}

fun `test execute monitor with custom query index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(queryIndex = customQueryIndex)
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
// Trying to test here few different "nesting" situations and "wierd" characters
val testDoc = """{
"message" : "This is an error from IAD region",
"source.port": 12345,
"source.ip.v6.v1" : 12345,
"source.ip.v6.v2" : 16645,
"source.ip.v4.v0" : 120,
"test_bad_char" : "\u0000",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
"test_field.some_other_field" : "us-west-2"
}"""
indexDoc(index, "1", testDoc)
client().admin().indices().putMapping(
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index and nested mappings`() {
val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)

// We are verifying here that index with nested mappings and nested aliases
// won't break query matching

// Create index mappings
val m: MutableMap<String, Any> = HashMap()
val m1: MutableMap<String, Any> = HashMap()
m1["title"] = Map.of("type", "text")
m1["category"] = Map.of("type", "keyword")
m["rule"] = Map.of("type", "nested", "properties", m1)
val properties = Map.of<String, Any>("properties", m)

client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias.field_a", "type=alias,path=message")
).source(properties)
).get()

// Put alias for nested fields
val mm: MutableMap<String, Any> = HashMap()
val mm1: MutableMap<String, Any> = HashMap()
mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title")
mm["rule"] = Map.of("type", "nested", "properties", mm1)
val properties1 = Map.of<String, Any>("properties", mm)
client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias2", "type=alias,path=test_field")
).source(properties1)
).get()

val testDoc = """{
"rule": {"title": "some_title"},
"message": "msg 1 2 3 4"
}"""
indexDoc(index, "2", testDoc)

client().admin().indices().putMapping(
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Expand All @@ -171,11 +255,10 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index and custom field mappings`() {
Expand Down

0 comments on commit b03a5ee

Please sign in to comment.