Skip to content

Commit

Permalink
Manual backport of changes in PR 845. (#943)
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt authored May 31, 2023
1 parent 0dc7e0d commit fab48fb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ dependencies {

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:4.7.0"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.index.IndexRequest
Expand Down Expand Up @@ -173,14 +174,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e)
val alertingException = AlertingException.wrap(e)
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
logger.error("Failed to start Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
ExceptionsHelper.unwrapCause(e).cause?.message.toString(),
RestStatus.INTERNAL_SERVER_ERROR,
e
)
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}

monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
Expand Down Expand Up @@ -531,15 +535,46 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

var xContentBuilder = XContentFactory.jsonBuilder().startObject()
sourceMap.forEach { (k, v) ->
xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v)
}
xContentBuilder = xContentBuilder.endObject()
transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

val sourceRef = BytesReference.bytes(xContentBuilder)

logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString())

Pair(hit.id, sourceRef)
}
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names.
*
* Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV:
* { {
* "a": { "a": {
* "b": 1234 ----> "b_my_log_index_TReewWdsf2gdJFV": 1234
* } }
* }
*
* @param jsonAsMap Input JSON (as Map)
* @param fieldNameSuffix Field suffix which is appended to existing field name
*/
private fun transformDocumentFieldNames(
jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
}
}
jsonAsMap.putAll(tempMap)
}
}

0 comments on commit fab48fb

Please sign in to comment.