Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chained Alert Behaviour Changes #1079

Merged
merged 3 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 130 additions & 12 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -83,6 +84,26 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForWorkflow(workflow: Workflow, dataSources: DataSources): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
workflow = workflow,
size = workflow.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
dataSources = dataSources
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
.groupBy { it.triggerId }
foundAlerts.values.forEach { alerts ->
if (alerts.size > 1) {
logger.warn("Found multiple alerts for same trigger: $alerts")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we try to resolve the older alert if this occurs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a follow up. But if this is not expected, we can ignore this unless we see issues for this.

}
}

return workflow.triggers.associateWith { trigger ->
foundAlerts[trigger.id]?.firstOrNull()
}
}

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
Expand Down Expand Up @@ -257,18 +278,84 @@ class AlertService(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
associatedAlertIds: List<String>,
result: ChainedAlertTriggerRunResult,
alertError: AlertError? = null,
): Alert? {

val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)

else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = Alert.State.ACTIVE
currentAlert.copy(
state = alertState,
lastNotificationTime = currentTime,
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
startTime = Instant.now(),
lastNotificationTime = currentTime,
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = IndexUtils.alertIndexSchemaVersion,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}
}

fun updateActionResultsForBucketLevelAlert(
Expand Down Expand Up @@ -758,6 +845,37 @@ class AlertService(
return searchResponse
}

/**
* Searches for ACTIVE/ACKNOWLEDGED chained alerts in the workflow's alertIndex.
*
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(
workflow: Workflow,
size: Int,
dataSources: DataSources,
): SearchResponse {
val workflowId = workflow.id
val alertIndex = dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, ""))
Copy link
Collaborator

@sbcd90 sbcd90 Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can there be a doc which has both workflowId & monitorId field set with respective ids?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alert created from composite monitor trigger will have only empty monitorId.

The underlying alerts created in AUDIT state will have both monitorId and worklfowId.

val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)

val searchRequest = SearchRequest(alertIndex)
.routing(workflowId)
.source(searchSourceBuilder)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
if (searchResponse.status() != RestStatus.OK) {
throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts"))
}
return searchResponse
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
Expand Down Expand Up @@ -52,6 +53,15 @@ class TriggerService(val scriptService: ScriptService) {
return result.triggered && !suppress
}

fun isChainedAlertTriggerActionable(
ctx: ChainedAlertTriggerExecutionContext,
result: ChainedAlertTriggerRunResult,
): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null
return result.triggered && !suppress
}

fun runQueryLevelTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.script

import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
import org.opensearch.commons.alerting.model.Workflow
import java.time.Instant
Expand All @@ -18,27 +19,10 @@ data class ChainedAlertTriggerExecutionContext(
val error: Exception? = null,
val trigger: ChainedAlertTrigger,
val alertGeneratingMonitors: Set<String>,
val monitorIdToAlertIdsMap: Map<String, Set<String>>
val monitorIdToAlertIdsMap: Map<String, Set<String>>,
val alert: Alert? = null
) {

constructor(
workflow: Workflow,
workflowRunResult: WorkflowRunResult,
trigger: ChainedAlertTrigger,
alertGeneratingMonitors: Set<String>,
monitorIdToAlertIdsMap: Map<String, Set<String>>
) :
this(
workflow,
workflowRunResult,
workflowRunResult.executionStartTime,
workflowRunResult.executionEndTime,
workflowRunResult.error,
trigger,
alertGeneratingMonitors,
monitorIdToAlertIdsMap
)
Comment on lines -24 to -40
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we dont need the constructor anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the data class itself gives the default constructor
#1079 (comment)


/**
* Mustache templates need special permissions to reflectively introspect field names. To avoid doing this we
* translate the context to a Map of Strings to primitive types, which can be accessed without reflection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,14 @@ class TransportGetWorkflowAlertsAction @Inject constructor(
}

fun resolveAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
return if (getAlertsRequest.alertIndex.isNullOrEmpty()) AlertIndices.ALERT_INDEX
else getAlertsRequest.alertIndex!!
var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN
if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) {
alertIndex = getAlertsRequest.alertIndex!!
}
return if (alertIndex == AlertIndices.ALERT_INDEX)
AlertIndices.ALL_ALERT_INDEX_PATTERN
else
alertIndex
}

fun resolveAssociatedAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,36 +150,69 @@ object CompositeWorkflowRunner : WorkflowRunner() {
error = lastErrorDelegateRun,
triggerResults = triggerResults
)
if (dataSources != null) {
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources)
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow)
for (trigger in workflow.triggers) {
val caTrigger = trigger as ChainedAlertTrigger
val triggerCtx = ChainedAlertTriggerExecutionContext(
workflow = workflow,
workflowRunResult = workflowRunResult,
trigger = caTrigger,
alertGeneratingMonitors = monitorIdToAlertIdsMap.keys,
monitorIdToAlertIdsMap = monitorIdToAlertIdsMap
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources!!)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(dataSources)
monitorCtx.alertService!!.loadCurrentAlertsForWorkflow(workflow, dataSources)
} catch (e: Exception) {
logger.error("Failed to fetch current alerts for workflow", e)
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading alerts for workflow: $id", e)
return workflowRunResult.copy(error = e)
}
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this since we do this on line 154?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary prolly, but I think this would be consistent practice to all other monitors where we ensure alertIndex is present before we fetch current alert.

val updatedAlerts = mutableListOf<Alert>()
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow)
for (trigger in workflow.triggers) {
val currentAlert = currentAlerts[trigger]
val caTrigger = trigger as ChainedAlertTrigger
val triggerCtx = ChainedAlertTriggerExecutionContext(
workflow = workflow,
workflowRunResult = workflowRunResult,
periodStart = workflowRunResult.executionStartTime,
periodEnd = workflowRunResult.executionEndTime,
trigger = caTrigger,
alertGeneratingMonitors = monitorIdToAlertIdsMap.keys,
monitorIdToAlertIdsMap = monitorIdToAlertIdsMap,
alert = currentAlert
)
runChainedAlertTrigger(
monitorCtx,
workflow,
trigger,
executionId,
triggerCtx,
dryRun,
triggerResults,
updatedAlerts
)
}
if (!dryRun && workflow.id != Workflow.NO_ID && updatedAlerts.isNotEmpty()) {
monitorCtx.retryPolicy?.let {
monitorCtx.alertService!!.saveAlerts(
dataSources,
updatedAlerts,
it,
routingId = workflow.id
)
runChainedAlertTrigger(dataSources, monitorCtx, workflow, trigger, executionId, triggerCtx, dryRun, triggerResults)
}
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading current chained alerts for workflow: $id", e)
return WorkflowRunResult(
workflowId = workflow.id,
workflowName = workflow.name,
monitorRunResults = emptyList(),
executionStartTime = workflowExecutionStartTime,
executionEndTime = Instant.now(),
executionId = executionId,
error = AlertingException.wrap(e),
triggerResults = emptyMap()
)
}
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading current chained alerts for workflow: $id", e)
return WorkflowRunResult(
workflowId = workflow.id,
workflowName = workflow.name,
monitorRunResults = emptyList(),
executionStartTime = workflowExecutionStartTime,
executionEndTime = Instant.now(),
executionId = executionId,
error = AlertingException.wrap(e),
triggerResults = emptyMap()
)
}
workflowRunResult.executionEndTime = Instant.now()

Expand Down Expand Up @@ -260,37 +293,30 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

private suspend fun runChainedAlertTrigger(
dataSources: DataSources,
monitorCtx: MonitorRunnerExecutionContext,
workflow: Workflow,
trigger: ChainedAlertTrigger,
executionId: String,
triggerCtx: ChainedAlertTriggerExecutionContext,
dryRun: Boolean,
triggerResults: MutableMap<String, ChainedAlertTriggerRunResult>,
updatedAlerts: MutableList<Alert>,
) {
val triggerRunResult = monitorCtx.triggerService!!.runChainedAlertTrigger(
workflow, trigger, triggerCtx.alertGeneratingMonitors, triggerCtx.monitorIdToAlertIdsMap
)
triggerResults[trigger.id] = triggerRunResult
if (triggerRunResult.triggered) {
if (monitorCtx.triggerService!!.isChainedAlertTriggerActionable(triggerCtx, triggerRunResult)) {
val actionCtx = triggerCtx
for (action in trigger.actions) {
triggerRunResult.actionResults[action.id] = this.runAction(action, actionCtx, monitorCtx, workflow, dryRun)
}
val alert = monitorCtx.alertService!!.composeChainedAlert(
triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList()
)
if (!dryRun && workflow.id != Workflow.NO_ID) {
monitorCtx.retryPolicy?.let {
monitorCtx.alertService!!.saveAlerts(
dataSources,
listOf(alert),
it,
routingId = workflow.id
)
}
}
}
val alert = monitorCtx.alertService!!.composeChainedAlert(
triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList(), triggerRunResult
)
if (alert != null) {
updatedAlerts.add(alert)
}
}

Expand Down
Loading