Skip to content

Commit

Permalink
DocLevelMonitor Error Alert revisit (opensearch-project#905) (opensea…
Browse files Browse the repository at this point in the history
…rch-project#910)

* initial commit

Signed-off-by: Petar Dzepina <[email protected]>

* test fix

Signed-off-by: Petar Dzepina <[email protected]>

* refactored to process multiple alerts when clearing/moving

Signed-off-by: Petar Dzepina <[email protected]>

* added test with multiple old alerts

Signed-off-by: Petar Dzepina <[email protected]>

* test fix

Signed-off-by: Petar Dzepina <[email protected]>

* limited upserting error alerts only in alertsIndex

Signed-off-by: Petar Dzepina <[email protected]>

---------

Signed-off-by: Petar Dzepina <[email protected]>
(cherry picked from commit 9e9f765)
  • Loading branch information
petardz authored May 9, 2023
1 parent 5d3c3f4 commit edc00ae
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 8 deletions.
166 changes: 160 additions & 6 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.ActionListener
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.bulk.BulkRequest
Expand All @@ -26,9 +27,11 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
Expand All @@ -46,12 +49,20 @@ import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.VersionType
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

/** Service that handles CRUD operations for alerts */
class AlertService(
Expand All @@ -62,6 +73,9 @@ class AlertService(

companion object {
const val MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT = 500
const val ERROR_ALERT_ID_PREFIX = "error-alert"

val ALERTS_SEARCH_TIMEOUT = TimeValue(5, TimeUnit.MINUTES)
}

private val logger = LogManager.getLogger(AlertService::class.java)
Expand Down Expand Up @@ -306,18 +320,16 @@ class AlertService(
}

suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) {
val errorAlertIdPrefix = "error-alert"
val newErrorAlertId = "$errorAlertIdPrefix-${monitor.id}-${UUID.randomUUID()}"
val newErrorAlertId = "$ERROR_ALERT_ID_PREFIX-${monitor.id}-${UUID.randomUUID()}"

val searchRequest = SearchRequest("${monitor.dataSources.alertsIndex}*")
val searchRequest = SearchRequest(monitor.dataSources.alertsIndex)
.source(
SearchSourceBuilder()
.sort(Alert.START_TIME_FIELD, SortOrder.DESC)
.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.queryStringQuery("${Alert.ALERT_ID_FIELD}:$errorAlertIdPrefix*"))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR))
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR.name))
)
)
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
Expand Down Expand Up @@ -345,20 +357,162 @@ class AlertService(
lastNotificationTime = currentTime
)
} else {
existingErrorAlert.copy(startTime = Instant.now(), lastNotificationTime = currentTime)
existingErrorAlert.copy(lastNotificationTime = currentTime)
}
}

val alertIndexRequest = IndexRequest(monitor.dataSources.alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.opType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(alert.id)

val indexResponse: IndexResponse = client.suspendUntil { index(alertIndexRequest, it) }
logger.debug("Monitor error Alert successfully upserted. Op result: ${indexResponse.result}")
}

suspend fun clearMonitorErrorAlert(monitor: Monitor) {
val currentTime = Instant.now()
try {
val searchRequest = SearchRequest("${monitor.dataSources.alertsIndex}")
.source(
SearchSourceBuilder()
.size(MAX_SEARCH_SIZE)
.sort(Alert.START_TIME_FIELD, SortOrder.DESC)
.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR.name))
)

)
searchRequest.cancelAfterTimeInterval = ALERTS_SEARCH_TIMEOUT
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
// If there's no error alert present, there's nothing to clear. We can stop here.
if (searchResponse.hits.totalHits.value == 0L) {
return
}

val indexRequests = mutableListOf<IndexRequest>()
searchResponse.hits.hits.forEach { hit ->
if (searchResponse.hits.totalHits.value > 1L) {
logger.warn("Found [${searchResponse.hits.totalHits.value}] error alerts for monitor [${monitor.id}] while clearing")
}
// Deserialize first/latest Alert
val xcp = contentParser(hit.sourceRef)
val existingErrorAlert = Alert.parse(xcp, hit.id, hit.version)

val updatedAlert = existingErrorAlert.copy(
endTime = currentTime
)

indexRequests += IndexRequest(monitor.dataSources.alertsIndex)
.routing(monitor.id)
.id(updatedAlert.id)
.source(updatedAlert.toXContentWithUser(XContentFactory.jsonBuilder()))
.opType(DocWriteRequest.OpType.INDEX)
}

val bulkResponse: BulkResponse = client.suspendUntil {
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.debug("Failed clearing error alert ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] Error Alerts successfully cleared. End time set to: $currentTime")
}
} catch (e: Exception) {
logger.error("Error clearing monitor error alerts for monitor [${monitor.id}]: ${ExceptionsHelper.detailedMessage(e)}")
}
}

/**
* Moves already cleared "error alerts" to history index.
* Error Alert is cleared when endTime timestamp is set, on first successful run after failed run
* */
suspend fun moveClearedErrorAlertsToHistory(monitorId: String, alertIndex: String, alertHistoryIndex: String) {
try {
val searchRequest = SearchRequest(alertIndex)
.source(
SearchSourceBuilder()
.size(MAX_SEARCH_SIZE)
.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR.name))
.must(QueryBuilders.existsQuery(Alert.END_TIME_FIELD))
)
.version(true) // Do we need this?
)
searchRequest.cancelAfterTimeInterval = ALERTS_SEARCH_TIMEOUT
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }

if (searchResponse.hits.totalHits.value == 0L) {
return
}

// Copy to history index

val copyRequests = mutableListOf<IndexRequest>()

searchResponse.hits.hits.forEach { hit ->

val xcp = contentParser(hit.sourceRef)
val alert = Alert.parse(xcp, hit.id, hit.version)

copyRequests.add(
IndexRequest(alertHistoryIndex)
.routing(alert.monitorId)
.source(hit.sourceRef, XContentType.JSON)
.version(hit.version)
.versionType(VersionType.EXTERNAL_GTE)
.id(hit.id)
.timeout(MonitorRunnerService.monitorCtx.indexTimeout)
)
}

val bulkResponse: BulkResponse = client.suspendUntil {
bulk(BulkRequest().add(copyRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.error("Failed copying error alert [${item.id}] to history index [$alertHistoryIndex]")
}
}
return
}

// Delete from alertIndex

val alertIds = searchResponse.hits.hits.map { it.id }

val deleteResponse: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(alertIndex)
.filter(QueryBuilders.termsQuery("_id", alertIds))
.refresh(true)
.timeout(ALERTS_SEARCH_TIMEOUT)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
)
}
deleteResponse.bulkFailures.forEach {
logger.error("Failed deleting alert while moving cleared alerts: [${it.id}] cause: [${it.cause}] ")
}
} catch (e: Exception) {
logger.error("Failed moving cleared error alerts to history index: ${ExceptionsHelper.detailedMessage(e)}")
}
}

suspend fun saveAlerts(
dataSources: DataSources,
alerts: List<Alert>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
if (errorMessage.isNotEmpty()) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage)
} else {
onSuccessfulMonitorRun(monitorCtx, monitor)
}

MonitorMetadataService.upsertMetadata(
Expand All @@ -259,6 +261,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) {
monitorCtx.alertService!!.clearMonitorErrorAlert(monitor)
if (monitor.dataSources.alertsHistoryIndex != null) {
monitorCtx.alertService!!.moveClearedErrorAlertsToHistory(
monitor.id,
monitor.dataSources.alertsIndex,
monitor.dataSources.alertsHistoryIndex!!
)
}
}

private fun constructErrorMessageFromTriggerResults(
triggerResults: MutableMap<String, DocumentLevelTriggerRunResult>? = null
): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import org.opensearch.commons.alerting.util.isBucketLevelMonitor

private val logger = LogManager.getLogger("AlertingUtils")

val MAX_SEARCH_SIZE = 10000

/**
* RFC 5322 compliant pattern matching: https://www.ietf.org/rfc/rfc5322.txt
* Regex was based off of this post: https://stackoverflow.com/a/201378
Expand Down
Loading

0 comments on commit edc00ae

Please sign in to comment.