diff --git a/alerting/build.gradle b/alerting/build.gradle index dc42cd126..5ca3c6ec0 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -92,6 +92,8 @@ dependencies { testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "org.mockito:mockito-core:4.7.0" + testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}" + testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}" } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 787f2371c..ee31fe1a3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.index.IndexRequest @@ -173,14 +174,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } } + monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) } catch (e: Exception) { - logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e) - val alertingException = AlertingException.wrap(e) - return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + logger.error("Failed to start Document-level-monitor ${monitor.name}", e) + val alertingException = AlertingException( + ExceptionsHelper.unwrapCause(e).cause?.message.toString(), + RestStatus.INTERNAL_SERVER_ERROR, + e + ) + monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) } - monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) - /* populate the map queryToDocIds with pairs of @@ -531,15 +535,46 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return hits.map { hit -> val sourceMap = hit.sourceAsMap - var xContentBuilder = XContentFactory.jsonBuilder().startObject() - sourceMap.forEach { (k, v) -> - xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v) - } - xContentBuilder = xContentBuilder.endObject() + transformDocumentFieldNames(sourceMap, "_${index}_$monitorId") + + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) val sourceRef = BytesReference.bytes(xContentBuilder) + logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString()) + Pair(hit.id, sourceRef) } } + + /** + * Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names. + * + * Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV: + * { { + * "a": { "a": { + * "b": 1234 ----> "b_my_log_index_TReewWdsf2gdJFV": 1234 + * } } + * } + * + * @param jsonAsMap Input JSON (as Map) + * @param fieldNameSuffix Field suffix which is appended to existing field name + */ + private fun transformDocumentFieldNames( + jsonAsMap: MutableMap, + fieldNameSuffix: String + ) { + val tempMap = mutableMapOf() + val it: MutableIterator> = jsonAsMap.entries.iterator() + while (it.hasNext()) { + val entry = it.next() + if (entry.value is Map<*, *>) { + transformDocumentFieldNames(entry.value as MutableMap, fieldNameSuffix) + } else if (entry.key.endsWith(fieldNameSuffix) == false) { + tempMap["${entry.key}$fieldNameSuffix"] = entry.value + it.remove() + } + } + jsonAsMap.putAll(tempMap) + } }