Skip to content

Commit

Permalink
DocLevelMonitor Error Alert - rework (#892)
Browse files Browse the repository at this point in the history
Specific "Error Alert" is created for a monitor, when error happens during monitor run. State of alert is ERROR and subsequent errors would upsert this "Error Alert". If error message is different then previous one, errorMessage field would be overwritten and previous error would be added to errorHistory rolling array(10 elements).
---------

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz authored May 5, 2023
1 parent 8c033b9 commit 461e95f
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 58 deletions.
70 changes: 70 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
Expand Down Expand Up @@ -40,13 +41,15 @@ import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import java.time.Instant
import java.util.UUID

Expand Down Expand Up @@ -187,6 +190,19 @@ class AlertService(
)
}

fun composeMonitorErrorAlert(
id: String,
monitor: Monitor,
alertError: AlertError
): Alert {
val currentTime = Instant.now()
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
}

fun updateActionResultsForBucketLevelAlert(
currentAlert: Alert,
actionResults: Map<String, ActionRunResult>,
Expand Down Expand Up @@ -281,6 +297,60 @@ class AlertService(
} ?: listOf()
}

suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) {
val errorAlertIdPrefix = "error-alert"
val newErrorAlertId = "$errorAlertIdPrefix-${monitor.id}-${UUID.randomUUID()}"

val searchRequest = SearchRequest("${monitor.dataSources.alertsIndex}*")
.source(
SearchSourceBuilder()
.sort(Alert.START_TIME_FIELD, SortOrder.DESC)
.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.queryStringQuery("${Alert.ALERT_ID_FIELD}:$errorAlertIdPrefix*"))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR))
)
)
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }

var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage))

if (searchResponse.hits.totalHits.value > 0L) {
if (searchResponse.hits.totalHits.value > 1L) {
logger.warn("There are [${searchResponse.hits.totalHits.value}] error alerts for monitor [${monitor.id}]")
}
// Deserialize first/latest Alert
val hit = searchResponse.hits.hits[0]
val xcp = contentParser(hit.sourceRef)
val existingErrorAlert = Alert.parse(xcp, hit.id, hit.version)

val currentTime = Instant.now()
alert = if (alert.errorMessage != existingErrorAlert.errorMessage) {
var newErrorHistory = existingErrorAlert.errorHistory.update(
AlertError(existingErrorAlert.startTime, existingErrorAlert.errorMessage!!)
)
alert.copy(
id = existingErrorAlert.id,
errorHistory = newErrorHistory,
startTime = currentTime,
lastNotificationTime = currentTime
)
} else {
existingErrorAlert.copy(startTime = Instant.now(), lastNotificationTime = currentTime)
}
}

val alertIndexRequest = IndexRequest(monitor.dataSources.alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.opType(DocWriteRequest.OpType.INDEX)
.id(alert.id)

val indexResponse: IndexResponse = client.suspendUntil { index(alertIndexRequest, it) }
logger.debug("Monitor error Alert successfully upserted. Op result: ${indexResponse.result}")
}

suspend fun saveAlerts(
dataSources: DataSources,
alerts: List<Alert>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
Expand Down Expand Up @@ -192,63 +193,88 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
ExceptionsHelper.unwrapCause(e).cause?.message.toString(),
RestStatus.INTERNAL_SERVER_ERROR,
e
)
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
*/
queries.forEach {
if (inputRunResults.containsKey(it.id)) {
queryToDocIds[it] = inputRunResults[it.id]!!
/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
*/
queries.forEach {
if (inputRunResults.containsKey(it.id)) {
queryToDocIds[it] = inputRunResults[it.id]!!
}
}
}

val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }
val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }

val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
}
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorCtx,
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun
)
}
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorCtx,
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun
// Don't update monitor if this is a test monitor
if (!isTempMonitor) {
// If any error happened during trigger execution, upsert monitor error alert
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
if (errorMessage.isNotEmpty()) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage)
}

MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}
}

// Don't update monitor if this is a test monitor
if (!isTempMonitor) {
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = triggerResults)
} catch (e: Exception) {
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage)
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
errorMessage,
RestStatus.INTERNAL_SERVER_ERROR,
e
)
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = triggerResults)
private fun constructErrorMessageFromTriggerResults(
triggerResults: MutableMap<String, DocumentLevelTriggerRunResult>? = null
): String {
var errorMessage = ""
if (triggerResults != null) {
val triggersErrorBuilder = StringBuilder()
triggerResults.forEach {
if (it.value.error != null) {
triggersErrorBuilder.append("[${it.key}]: [${it.value.error!!.userErrorMessage()}]").append(" | ")
}
}
if (triggersErrorBuilder.isNotEmpty()) {
errorMessage = "Trigger errors: $triggersErrorBuilder"
}
}
return errorMessage
}

private suspend fun runForEachDocTrigger(
Expand Down Expand Up @@ -295,16 +321,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
alerts.add(alert)
}

if (findingDocPairs.isEmpty() && monitorResult.error != null) {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(),
listOf(),
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError()
)
alerts.add(alert)
}

val shouldDefaultToPerExecution = defaultToPerExecutionAction(
monitorCtx.maxActionableAlertCount,
monitorId = monitor.id,
Expand Down Expand Up @@ -576,8 +592,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)

val response: SearchResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
var response: SearchResponse
try {
response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
} catch (e: Exception) {
throw IllegalStateException(
"Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e
)
}

if (response.status() !== RestStatus.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ class AlertIndices(
}
if (existsResponse.isExists) return true

logger.debug("index: [$index] schema mappings: [$schemaMapping]")
val request = CreateIndexRequest(index)
.mapping(schemaMapping)
.settings(Settings.builder().put("index.hidden", true).build())
Expand Down Expand Up @@ -474,7 +475,7 @@ class AlertIndices(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
if (!clusterStateResponse.state.metadata.indices.isEmpty()) {
if (clusterStateResponse.state.metadata.indices.isNotEmpty()) {
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
logger.info("Deleting old $tag indices viz $indicesToDelete")
deleteAllOldHistoryIndices(indicesToDelete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
)
indexRequests.add(indexRequest)
}
log.debug("bulk inserting percolate [${queries.size}] queries")
if (indexRequests.isNotEmpty()) {
val bulkResponse: BulkResponse = client.suspendUntil {
client.bulk(
Expand Down
Loading

0 comments on commit 461e95f

Please sign in to comment.