Skip to content

Commit

Permalink
Fixed transformation of document field names before running percolato…
Browse files Browse the repository at this point in the history
…r search (#845)

* Fixed transformation of document field names before running percolator search

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz authored Apr 6, 2023
1 parent 3f2e651 commit 702da92
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 27 deletions.
1 change: 1 addition & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ dependencies {
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:${versions.mockito}"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
Expand Down Expand Up @@ -186,8 +187,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor ${monitor.name}. Error: ${e.message}", e)
val alertingException = AlertingException.wrap(e)
logger.error("Failed to start Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
ExceptionsHelper.unwrapCause(e).cause?.message.toString(),
RestStatus.INTERNAL_SERVER_ERROR,
e
)
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}

Expand Down Expand Up @@ -390,25 +395,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

suspend fun createRunContext(
clusterService: ClusterService,
client: Client,
index: String,
createdRecently: Boolean = false
): HashMap<String, Any> {
val lastRunContext = HashMap<String, Any>()
lastRunContext["index"] = index
val count = getShardsCount(clusterService, index)
lastRunContext["shards_count"] = count

for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = if (createdRecently) -1L else getMaxSeqNo(client, index, shard)
lastRunContext[shard] = maxSeqNo
}
return lastRunContext
}

// Checks if the index was created from the last execution run or when the monitor was last updated to ensure that
// new index is monitored from the beginning of that index
private fun createdRecently(
Expand Down Expand Up @@ -563,15 +549,46 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

var xContentBuilder = XContentFactory.jsonBuilder().startObject()
sourceMap.forEach { (k, v) ->
xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v)
}
xContentBuilder = xContentBuilder.endObject()
transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

val sourceRef = BytesReference.bytes(xContentBuilder)

logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString())

Pair(hit.id, sourceRef)
}
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names.
*
* Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV:
* { {
* "a": { "a": {
* "b": 1234 ----> "b_my_log_index_TReewWdsf2gdJFV": 1234
* } }
* }
*
* @param jsonAsMap Input JSON (as Map)
* @param fieldNameSuffix Field suffix which is appended to existing field name
*/
private fun transformDocumentFieldNames(
jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
}
}
jsonAsMap.putAll(tempMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.SearchMonitorAction
Expand Down Expand Up @@ -209,6 +210,117 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals("Didn't match all 8 queries", 8, findings[0].docLevelQueries.size)
}

fun `test execute monitor with non-flattened json doc as source`() {
val docQuery1 = DocLevelQuery(query = "source.device.port:12345 OR source.device.hwd.id:12345", name = "3")

val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)

val mappings = """{
"properties": {
"source.device.port": { "type": "long" },
"source.device.hwd.id": { "type": "long" },
"nested_field": {
"type": "nested",
"properties": {
"test1": {
"type": "keyword"
}
}
},
"my_join_field": {
"type": "join",
"relations": {
"question": "answer"
}
}
}
}"""

client().admin().indices().putMapping(PutMappingRequest(index).source(mappings, XContentType.JSON)).get()
val getFieldCapabilitiesResp = client().fieldCaps(FieldCapabilitiesRequest().indices(index).fields("*")).get()
assertTrue(getFieldCapabilitiesResp.getField("source").containsKey("object"))
assertTrue(getFieldCapabilitiesResp.getField("source.device").containsKey("object"))
assertTrue(getFieldCapabilitiesResp.getField("source.device.hwd").containsKey("object"))
// testing both, nested and flatten documents
val testDocuments = mutableListOf<String>()
testDocuments += """{
"source" : { "device": {"port" : 12345 } },
"nested_field": { "test1": "some text" }
}"""
testDocuments += """{
"source.device.port" : "12345"
}"""
testDocuments += """{
"source.device.port" : 12345
}"""
testDocuments += """{
"source" : { "device": {"hwd": { "id": 12345 } } }
}"""
testDocuments += """{
"source.device.hwd.id" : 12345
}"""
// Document with join field
testDocuments += """{
"source" : { "device" : { "hwd": { "id" : 12345 } } },
"my_join_field": { "name": "question" }
}"""
// Checking if these pointless but valid documents cause any issues
testDocuments += """{
"source" : {}
}"""
testDocuments += """{
"source.device" : null
}"""
testDocuments += """{
"source.device" : {}
}"""
testDocuments += """{
"source.device.hwd" : {}
}"""
testDocuments += """{
"source.device.hwd.id" : null
}"""
testDocuments += """{
"some.multi.val.field" : [12345, 10, 11]
}"""
// Insert all documents
for (i in testDocuments.indices) {
indexDoc(index, "$i", testDocuments[i])
}
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 6, findings.size)
assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index old`() {
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.index.reindex.ReindexModulePlugin
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.join.ParentJoinModulePlugin
import org.opensearch.plugins.Plugin
import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
Expand Down Expand Up @@ -229,7 +230,7 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() {
).get()

override fun getPlugins(): List<Class<out Plugin>> {
return listOf(AlertingPlugin::class.java, ReindexModulePlugin::class.java)
return listOf(AlertingPlugin::class.java, ReindexModulePlugin::class.java, ParentJoinModulePlugin::class.java)
}

override fun resetNodeAfterTest(): Boolean {
Expand Down

0 comments on commit 702da92

Please sign in to comment.