Skip to content

Commit

Permalink
fix for ERROR alert state generation in doc-level monitors (#768)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored Jan 27, 2023
1 parent fa47702 commit daf8c7c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,18 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
} catch (e: Exception) {
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
return monitorResult.copy(error = AlertingException.wrap(e))
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

try {
validate(monitor)
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor. Error: ${e.message}")
return monitorResult.copy(error = AlertingException.wrap(e))
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor, createWithRunContext = false)

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
monitorId = monitor.id,
monitorMetadata,
indexTimeout = monitorCtx.indexTimeout!!
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
Expand All @@ -109,6 +101,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docsToQueries = mutableMapOf<String, MutableList<String>>()

try {
monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
monitorId = monitor.id,
monitorMetadata,
indexTimeout = monitorCtx.indexTimeout!!
)

val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it)
Expand Down Expand Up @@ -174,14 +174,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e)
val alertingException = AlertingException.wrap(e)
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}

monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
Expand Down Expand Up @@ -265,6 +264,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
alerts.add(alert)
}

if (findingDocPairs.isEmpty() && monitorResult.error != null) {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(),
listOf(),
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError()
)
alerts.add(alert)
}

val shouldDefaultToPerExecution = defaultToPerExecutionAction(
monitorCtx.maxActionableAlertCount,
monitorId = monitor.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PAT
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
Expand Down Expand Up @@ -180,6 +181,37 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
}

fun `test execute monitor input error`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", tags = listOf("test_tag"))
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

deleteIndex(testIndex)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val inputResults = output.stringMap("input_results")
assertTrue("Missing monitor error message", (inputResults?.get("error") as String).isNotEmpty())

val alerts = searchAlerts(monitor)
assertEquals("Alert not saved", 1, alerts.size)
assertEquals("Alert status is incorrect", Alert.State.ERROR, alerts[0].state)
}

fun `test execute monitor generates alerts and findings with per alert execution for actions`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down

0 comments on commit daf8c7c

Please sign in to comment.