Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backports #1079 (Chained Alert Behaviour Changes ) #1092

Merged
merged 3 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 130 additions & 12 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -83,6 +84,26 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForWorkflow(workflow: Workflow, dataSources: DataSources): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
workflow = workflow,
size = workflow.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
dataSources = dataSources
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
.groupBy { it.triggerId }
foundAlerts.values.forEach { alerts ->
if (alerts.size > 1) {
logger.warn("Found multiple alerts for same trigger: $alerts")
}
}

return workflow.triggers.associateWith { trigger ->
foundAlerts[trigger.id]?.firstOrNull()
}
}

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
Expand Down Expand Up @@ -257,18 +278,84 @@ class AlertService(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
associatedAlertIds: List<String>,
result: ChainedAlertTriggerRunResult,
alertError: AlertError? = null,
): Alert? {

val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)

else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = Alert.State.ACTIVE
currentAlert.copy(
state = alertState,
lastNotificationTime = currentTime,
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
startTime = Instant.now(),
lastNotificationTime = currentTime,
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = IndexUtils.alertIndexSchemaVersion,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}
}

fun updateActionResultsForBucketLevelAlert(
Expand Down Expand Up @@ -762,6 +849,37 @@ class AlertService(
return searchResponse
}

/**
* Searches for ACTIVE/ACKNOWLEDGED chained alerts in the workflow's alertIndex.
*
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(
workflow: Workflow,
size: Int,
dataSources: DataSources,
): SearchResponse {
val workflowId = workflow.id
val alertIndex = dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, ""))
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)

val searchRequest = SearchRequest(alertIndex)
.routing(workflowId)
.source(searchSourceBuilder)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
if (searchResponse.status() != RestStatus.OK) {
throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts"))
}
return searchResponse
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
Expand Down
10 changes: 10 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
Expand Down Expand Up @@ -52,6 +53,15 @@ class TriggerService(val scriptService: ScriptService) {
return result.triggered && !suppress
}

fun isChainedAlertTriggerActionable(
ctx: ChainedAlertTriggerExecutionContext,
result: ChainedAlertTriggerRunResult,
): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null
return result.triggered && !suppress
}

fun runQueryLevelTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.script

import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
import org.opensearch.commons.alerting.model.Workflow
import java.time.Instant
Expand All @@ -18,27 +19,10 @@ data class ChainedAlertTriggerExecutionContext(
val error: Exception? = null,
val trigger: ChainedAlertTrigger,
val alertGeneratingMonitors: Set<String>,
val monitorIdToAlertIdsMap: Map<String, Set<String>>
val monitorIdToAlertIdsMap: Map<String, Set<String>>,
val alert: Alert? = null
) {

constructor(
workflow: Workflow,
workflowRunResult: WorkflowRunResult,
trigger: ChainedAlertTrigger,
alertGeneratingMonitors: Set<String>,
monitorIdToAlertIdsMap: Map<String, Set<String>>
) :
this(
workflow,
workflowRunResult,
workflowRunResult.executionStartTime,
workflowRunResult.executionEndTime,
workflowRunResult.error,
trigger,
alertGeneratingMonitors,
monitorIdToAlertIdsMap
)

/**
* Mustache templates need special permissions to reflectively introspect field names. To avoid doing this we
* translate the context to a Map of Strings to primitive types, which can be accessed without reflection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,14 @@ class TransportGetWorkflowAlertsAction @Inject constructor(
}

fun resolveAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
return if (getAlertsRequest.alertIndex.isNullOrEmpty()) AlertIndices.ALERT_INDEX
else getAlertsRequest.alertIndex!!
var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN
if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) {
alertIndex = getAlertsRequest.alertIndex!!
}
return if (alertIndex == AlertIndices.ALERT_INDEX)
AlertIndices.ALL_ALERT_INDEX_PATTERN
else
alertIndex
}

fun resolveAssociatedAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,36 +150,69 @@ object CompositeWorkflowRunner : WorkflowRunner() {
error = lastErrorDelegateRun,
triggerResults = triggerResults
)
if (dataSources != null) {
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources)
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow)
for (trigger in workflow.triggers) {
val caTrigger = trigger as ChainedAlertTrigger
val triggerCtx = ChainedAlertTriggerExecutionContext(
workflow = workflow,
workflowRunResult = workflowRunResult,
trigger = caTrigger,
alertGeneratingMonitors = monitorIdToAlertIdsMap.keys,
monitorIdToAlertIdsMap = monitorIdToAlertIdsMap
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources!!)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(dataSources)
monitorCtx.alertService!!.loadCurrentAlertsForWorkflow(workflow, dataSources)
} catch (e: Exception) {
logger.error("Failed to fetch current alerts for workflow", e)
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading alerts for workflow: $id", e)
return workflowRunResult.copy(error = e)
}
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources)
val updatedAlerts = mutableListOf<Alert>()
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow)
for (trigger in workflow.triggers) {
val currentAlert = currentAlerts[trigger]
val caTrigger = trigger as ChainedAlertTrigger
val triggerCtx = ChainedAlertTriggerExecutionContext(
workflow = workflow,
workflowRunResult = workflowRunResult,
periodStart = workflowRunResult.executionStartTime,
periodEnd = workflowRunResult.executionEndTime,
trigger = caTrigger,
alertGeneratingMonitors = monitorIdToAlertIdsMap.keys,
monitorIdToAlertIdsMap = monitorIdToAlertIdsMap,
alert = currentAlert
)
runChainedAlertTrigger(
monitorCtx,
workflow,
trigger,
executionId,
triggerCtx,
dryRun,
triggerResults,
updatedAlerts
)
}
if (!dryRun && workflow.id != Workflow.NO_ID && updatedAlerts.isNotEmpty()) {
monitorCtx.retryPolicy?.let {
monitorCtx.alertService!!.saveAlerts(
dataSources,
updatedAlerts,
it,
routingId = workflow.id
)
runChainedAlertTrigger(dataSources, monitorCtx, workflow, trigger, executionId, triggerCtx, dryRun, triggerResults)
}
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading current chained alerts for workflow: $id", e)
return WorkflowRunResult(
workflowId = workflow.id,
workflowName = workflow.name,
monitorRunResults = emptyList(),
executionStartTime = workflowExecutionStartTime,
executionEndTime = Instant.now(),
executionId = executionId,
error = AlertingException.wrap(e),
triggerResults = emptyMap()
)
}
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading current chained alerts for workflow: $id", e)
return WorkflowRunResult(
workflowId = workflow.id,
workflowName = workflow.name,
monitorRunResults = emptyList(),
executionStartTime = workflowExecutionStartTime,
executionEndTime = Instant.now(),
executionId = executionId,
error = AlertingException.wrap(e),
triggerResults = emptyMap()
)
}
workflowRunResult.executionEndTime = Instant.now()

Expand Down Expand Up @@ -260,37 +293,30 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

private suspend fun runChainedAlertTrigger(
dataSources: DataSources,
monitorCtx: MonitorRunnerExecutionContext,
workflow: Workflow,
trigger: ChainedAlertTrigger,
executionId: String,
triggerCtx: ChainedAlertTriggerExecutionContext,
dryRun: Boolean,
triggerResults: MutableMap<String, ChainedAlertTriggerRunResult>,
updatedAlerts: MutableList<Alert>,
) {
val triggerRunResult = monitorCtx.triggerService!!.runChainedAlertTrigger(
workflow, trigger, triggerCtx.alertGeneratingMonitors, triggerCtx.monitorIdToAlertIdsMap
)
triggerResults[trigger.id] = triggerRunResult
if (triggerRunResult.triggered) {
if (monitorCtx.triggerService!!.isChainedAlertTriggerActionable(triggerCtx, triggerRunResult)) {
val actionCtx = triggerCtx
for (action in trigger.actions) {
triggerRunResult.actionResults[action.id] = this.runAction(action, actionCtx, monitorCtx, workflow, dryRun)
}
val alert = monitorCtx.alertService!!.composeChainedAlert(
triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList()
)
if (!dryRun && workflow.id != Workflow.NO_ID) {
monitorCtx.retryPolicy?.let {
monitorCtx.alertService!!.saveAlerts(
dataSources,
listOf(alert),
it,
routingId = workflow.id
)
}
}
}
val alert = monitorCtx.alertService!!.composeChainedAlert(
triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList(), triggerRunResult
)
if (alert != null) {
updatedAlerts.add(alert)
}
}

Expand Down
Loading