Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mappings traversal bug fix #669

Merged
merged 10 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,34 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
}

/**
* From given index mapping node, extracts fieldName -> fieldProperties pair
*/
fun extractField(node: MutableMap<String, Any>, currentPath: String): Pair<String, MutableMap<String, Any>> {
fun traverseMappingsAndUpdate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments in the different logic flows of this function? It is kind of hard to follow what it is doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

node: MutableMap<String, Any>,
currentPath: String,
processLeafFn: (String, MutableMap<String, Any>) -> Triple<String, String, MutableMap<String, Any>>,
flattenPaths: MutableList<String>
) {
if (node.containsKey(PROPERTIES)) {
return extractField(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath)
} else if (node.containsKey(NESTED)) {
return extractField(node.get(NESTED) as MutableMap<String, Any>, currentPath)
} else if (node.size == 1 && node.containsKey(TYPE) == false) {
val iter = node.iterator().next()
return extractField(iter.value as MutableMap<String, Any>, currentPath + "." + iter.key)
} else {
return Pair(currentPath, node)
return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath, processLeafFn, flattenPaths)
} else if (node.containsKey(TYPE) == false) {
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
node.entries.forEach {
val fullPath = if (currentPath.isEmpty()) it.key
else "$currentPath.${it.key}"
val nodeProps = it.value as MutableMap<String, Any>
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
flattenPaths.add(fullPath)
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
}
}
newNodes.forEach {
if (it.first != it.second) {
node.remove(it.first)
}
node.put(it.second, it.third)
}
}
}

Expand All @@ -134,30 +149,38 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
val indices = getIndexResponse.indices()

// Run through each backing index and apply appropriate mappings to query index
indices?.forEach { indexName ->
if (clusterState.routingTable.hasIndex(indexName)) {
val indexMetadata = clusterState.metadata.index(indexName)
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
val properties = (
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
as Map<String, Map<String, Any>>
as MutableMap<String, Any>
)

val updatedProperties = properties.entries.associate {
var (fieldName, fieldProps) = extractField(it.value as MutableMap<String, Any>, it.key)
val newProps = fieldProps
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) {
mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newProps[iter.key] = iter.value
// Node processor function is used to process leaves of index mappings tree
val leafNodeProcessor =
fun(fieldName: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
val newProps = props.toMutableMap()
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) {
mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newProps[iter.key] = iter.value
}
}
}
if (props.containsKey("path")) {
newProps["path"] = "${props["path"]}_${indexName}_$monitorId"
}
return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps)
}
// Traverse and update index mappings here while extracting flatten field paths
val flattenPaths = mutableListOf<String>()
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)
// Updated mappings ready to be applied on queryIndex
val updatedProperties = properties

if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId"
"${fieldName}_${indexName}_$monitorId" to newProps
}
val queryIndex = monitor.dataSources.queryIndex

val updateMappingRequest = PutMappingRequest(queryIndex)
Expand All @@ -170,8 +193,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
val indexRequests = mutableListOf<IndexRequest>()
queries.forEach {
var query = it.query
properties.forEach { prop ->
query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:")
flattenPaths.forEach { fieldPath ->
query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:")
}
val indexRequest = IndexRequest(queryIndex)
.id(it.id + "_${indexName}_$monitorId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Map
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

Expand All @@ -56,7 +57,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "alerting")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, true)
Expand Down Expand Up @@ -130,37 +130,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}

fun `test execute monitor with custom query index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(queryIndex = customQueryIndex)
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
// Trying to test here few different "nesting" situations and "wierd" characters
val testDoc = """{
"message" : "This is an error from IAD region",
"source.port": 12345,
"source.ip.v6.v1" : 12345,
"source.ip.v6.v2" : 16645,
"source.ip.v4.v0" : 120,
"test_bad_char" : "\u0000",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
"test_field.some_other_field" : "us-west-2"
}"""
indexDoc(index, "1", testDoc)
client().admin().indices().putMapping(
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index and nested mappings`() {
val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)

// We are verifying here that index with nested mappings and nested aliases
// won't break query matching

// Create index mappings
val m: MutableMap<String, Any> = HashMap()
val m1: MutableMap<String, Any> = HashMap()
m1["title"] = Map.of("type", "text")
m1["category"] = Map.of("type", "keyword")
m["rule"] = Map.of("type", "nested", "properties", m1)
val properties = Map.of<String, Any>("properties", m)

client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias.field_a", "type=alias,path=message")
).source(properties)
).get()

// Put alias for nested fields
val mm: MutableMap<String, Any> = HashMap()
val mm1: MutableMap<String, Any> = HashMap()
mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title")
mm["rule"] = Map.of("type", "nested", "properties", mm1)
val properties1 = Map.of<String, Any>("properties", mm)
client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias2", "type=alias,path=test_field")
).source(properties1)
).get()

val testDoc = """{
"rule": {"title": "some_title"},
"message": "msg 1 2 3 4"
}"""
indexDoc(index, "2", testDoc)

client().admin().indices().putMapping(
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Expand All @@ -172,11 +255,10 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index and custom field mappings`() {
Expand Down