Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 892 to 2.x #907

Merged
merged 1 commit into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
Expand Down Expand Up @@ -40,13 +41,15 @@ import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import java.time.Instant
import java.util.UUID

Expand Down Expand Up @@ -193,6 +196,19 @@ class AlertService(
)
}

fun composeMonitorErrorAlert(
id: String,
monitor: Monitor,
alertError: AlertError
): Alert {
val currentTime = Instant.now()
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
}

fun updateActionResultsForBucketLevelAlert(
currentAlert: Alert,
actionResults: Map<String, ActionRunResult>,
Expand Down Expand Up @@ -289,6 +305,60 @@ class AlertService(
} ?: listOf()
}

suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) {
val errorAlertIdPrefix = "error-alert"
val newErrorAlertId = "$errorAlertIdPrefix-${monitor.id}-${UUID.randomUUID()}"

val searchRequest = SearchRequest("${monitor.dataSources.alertsIndex}*")
.source(
SearchSourceBuilder()
.sort(Alert.START_TIME_FIELD, SortOrder.DESC)
.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.queryStringQuery("${Alert.ALERT_ID_FIELD}:$errorAlertIdPrefix*"))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR))
)
)
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }

var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage))

if (searchResponse.hits.totalHits.value > 0L) {
if (searchResponse.hits.totalHits.value > 1L) {
logger.warn("There are [${searchResponse.hits.totalHits.value}] error alerts for monitor [${monitor.id}]")
}
// Deserialize first/latest Alert
val hit = searchResponse.hits.hits[0]
val xcp = contentParser(hit.sourceRef)
val existingErrorAlert = Alert.parse(xcp, hit.id, hit.version)

val currentTime = Instant.now()
alert = if (alert.errorMessage != existingErrorAlert.errorMessage) {
var newErrorHistory = existingErrorAlert.errorHistory.update(
AlertError(existingErrorAlert.startTime, existingErrorAlert.errorMessage!!)
)
alert.copy(
id = existingErrorAlert.id,
errorHistory = newErrorHistory,
startTime = currentTime,
lastNotificationTime = currentTime
)
} else {
existingErrorAlert.copy(startTime = Instant.now(), lastNotificationTime = currentTime)
}
}

val alertIndexRequest = IndexRequest(monitor.dataSources.alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.opType(DocWriteRequest.OpType.INDEX)
.id(alert.id)

val indexResponse: IndexResponse = client.suspendUntil { index(alertIndexRequest, it) }
logger.debug("Monitor error Alert successfully upserted. Op result: ${indexResponse.result}")
}

suspend fun saveAlerts(
dataSources: DataSources,
alerts: List<Alert>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
Expand Down Expand Up @@ -192,53 +193,88 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
*/
queries.forEach {
if (inputRunResults.containsKey(it.id)) {
queryToDocIds[it] = inputRunResults[it.id]!!
}
}

val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }

val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
}
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorCtx,
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun
)
}
}
// Don't update monitor if this is a test monitor
if (!isTempMonitor) {
// If any error happened during trigger execution, upsert monitor error alert
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
if (errorMessage.isNotEmpty()) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage)
}

MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = triggerResults)
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor ${monitor.name}", e)
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage)
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
ExceptionsHelper.unwrapCause(e).cause?.message.toString(),
errorMessage,
RestStatus.INTERNAL_SERVER_ERROR,
e
)
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}
}

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
*/
queries.forEach {
if (inputRunResults.containsKey(it.id)) {
queryToDocIds[it] = inputRunResults[it.id]!!
private fun constructErrorMessageFromTriggerResults(
triggerResults: MutableMap<String, DocumentLevelTriggerRunResult>? = null
): String {
var errorMessage = ""
if (triggerResults != null) {
val triggersErrorBuilder = StringBuilder()
triggerResults.forEach {
if (it.value.error != null) {
triggersErrorBuilder.append("[${it.key}]: [${it.value.error!!.userErrorMessage()}]").append(" | ")
}
}
if (triggersErrorBuilder.isNotEmpty()) {
errorMessage = "Trigger errors: $triggersErrorBuilder"
}
}

val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }

val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorCtx,
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun
)
}

// Don't update monitor if this is a test monitor
if (!isTempMonitor) {
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = triggerResults)
return errorMessage
}

private suspend fun runForEachDocTrigger(
Expand Down Expand Up @@ -285,16 +321,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
alerts.add(alert)
}

if (findingDocPairs.isEmpty() && monitorResult.error != null) {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(),
listOf(),
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError()
)
alerts.add(alert)
}

val shouldDefaultToPerExecution = defaultToPerExecutionAction(
monitorCtx.maxActionableAlertCount,
monitorId = monitor.id,
Expand Down Expand Up @@ -567,8 +593,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)

val response: SearchResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
var response: SearchResponse
try {
response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
} catch (e: Exception) {
throw IllegalStateException(
"Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e
)
}

if (response.status() !== RestStatus.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,18 @@ class AlertIndices(
}

@Volatile private var alertHistoryEnabled = AlertingSettings.ALERT_HISTORY_ENABLED.get(settings)

@Volatile private var findingHistoryEnabled = AlertingSettings.FINDING_HISTORY_ENABLED.get(settings)

@Volatile private var alertHistoryMaxDocs = AlertingSettings.ALERT_HISTORY_MAX_DOCS.get(settings)

@Volatile private var findingHistoryMaxDocs = AlertingSettings.FINDING_HISTORY_MAX_DOCS.get(settings)

@Volatile private var alertHistoryMaxAge = AlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE.get(settings)

@Volatile private var findingHistoryMaxAge = AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE.get(settings)

@Volatile private var alertHistoryRolloverPeriod = AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.get(settings)

@Volatile private var findingHistoryRolloverPeriod = AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD.get(settings)

@Volatile private var alertHistoryRetentionPeriod = AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.get(settings)

@Volatile private var findingHistoryRetentionPeriod = AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD.get(settings)

@Volatile private var requestTimeout = AlertingSettings.REQUEST_TIMEOUT.get(settings)
Expand Down Expand Up @@ -454,25 +449,17 @@ class AlertIndices(

private fun rolloverAlertHistoryIndex() {
rolloverIndex(
alertHistoryIndexInitialized,
ALERT_HISTORY_WRITE_INDEX,
ALERT_HISTORY_INDEX_PATTERN,
alertMapping(),
alertHistoryMaxDocs,
alertHistoryMaxAge,
ALERT_HISTORY_WRITE_INDEX
alertHistoryIndexInitialized, ALERT_HISTORY_WRITE_INDEX,
ALERT_HISTORY_INDEX_PATTERN, alertMapping(),
alertHistoryMaxDocs, alertHistoryMaxAge, ALERT_HISTORY_WRITE_INDEX
)
}

private fun rolloverFindingHistoryIndex() {
rolloverIndex(
findingHistoryIndexInitialized,
FINDING_HISTORY_WRITE_INDEX,
FINDING_HISTORY_INDEX_PATTERN,
findingMapping(),
findingHistoryMaxDocs,
findingHistoryMaxAge,
FINDING_HISTORY_WRITE_INDEX
findingHistoryIndexInitialized, FINDING_HISTORY_WRITE_INDEX,
FINDING_HISTORY_INDEX_PATTERN, findingMapping(),
findingHistoryMaxDocs, findingHistoryMaxAge, FINDING_HISTORY_WRITE_INDEX
)
}

Expand All @@ -488,7 +475,7 @@ class AlertIndices(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
if (clusterStateResponse.state.metadata.indices.isNotEmpty()) {
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
logger.info("Deleting old $tag indices viz $indicesToDelete")
deleteAllOldHistoryIndices(indicesToDelete)
Expand Down Expand Up @@ -523,7 +510,7 @@ class AlertIndices(
): String? {
val creationTime = indexMetadata.creationDate
if ((Instant.now().toEpochMilli() - creationTime) > retentionPeriodMillis) {
val alias = indexMetadata.aliases.firstOrNull { writeIndex == it.value.alias }
val alias = indexMetadata.aliases.entries.firstOrNull { writeIndex == it.value.alias }
if (alias != null) {
if (historyEnabled) {
// If the index has the write alias and history is enabled, don't delete the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
)
indexRequests.add(indexRequest)
}
log.debug("bulk inserting percolate [${queries.size}] queries")
if (indexRequests.isNotEmpty()) {
val bulkResponse: BulkResponse = client.suspendUntil {
client.bulk(
Expand Down
Loading