Skip to content

Commit

Permalink
Add action and alert flow and findings schema and additional fixes (o…
Browse files Browse the repository at this point in the history
…pensearch-project#381)

Signed-off-by: Ashish Agrawal <[email protected]>
  • Loading branch information
lezzago committed Apr 10, 2022
1 parent 5b912f0 commit ebdecc6
Show file tree
Hide file tree
Showing 26 changed files with 782 additions and 178 deletions.
25 changes: 25 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -166,6 +168,29 @@ class AlertService(
}
}

// TODO: clean this up so it follows the proper alert management for doc monitors
fun composeDocLevelAlert(
findings: List<String>,
relatedDocIds: List<String>,
ctx: DocumentLevelTriggerExecutionContext,
result: DocumentLevelTriggerRunResult,
alertError: AlertError?
): Alert {
val currentTime = Instant.now()

val actionExecutionResults = result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
return Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
actionExecutionResults = actionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion,
findingIds = findings, relatedDocIds = relatedDocIds
)
}

fun updateActionResultsForBucketLevelAlert(
currentAlert: Alert,
actionResults: Map<String, ActionRunResult>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand All @@ -42,7 +43,6 @@ import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteDestinationAction
import org.opensearch.alerting.resthandler.RestDeleteEmailAccountAction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ import java.time.Instant
object BucketLevelMonitorRunner : MonitorRunner {
private val logger = LogManager.getLogger(javaClass)

override suspend fun runMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, dryrun: Boolean):
MonitorRunResult<BucketLevelTriggerRunResult> {
override suspend fun runMonitor(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
): MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")

Expand Down Expand Up @@ -82,7 +87,12 @@ object BucketLevelMonitorRunner : MonitorRunner {
// in the final output of monitorResult which occurs when all pages have been exhausted.
// If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths
// with different page counts.
val inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd, monitorResult.inputResults)
val inputResults = monitorCtx.inputService!!.collectInputResults(
monitor,
periodStart,
periodEnd,
monitorResult.inputResults
)
if (firstIteration) {
firstPageOfInputResults = inputResults
firstIteration = false
Expand Down Expand Up @@ -149,7 +159,8 @@ object BucketLevelMonitorRunner : MonitorRunner {
// in favor of just using the currentAlerts as-is.
currentAlerts.forEach { (trigger, keysToAlertsMap) ->
if (triggerResults[trigger.id]?.error == null)
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap))
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)
?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap))
}

for (trigger in monitor.triggers) {
Expand Down Expand Up @@ -262,24 +273,39 @@ object BucketLevelMonitorRunner : MonitorRunner {
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false)
// Save any COMPLETED Alerts that were not covered in updatedAlerts
monitorCtx.alertService!!.saveAlerts(completedAlertsToUpdate.toList(), monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false)
monitorCtx.alertService!!.saveAlerts(
completedAlertsToUpdate.toList(),
monitorCtx.retryPolicy!!,
allowUpdatingAcknowledgedAlert = false
)
}
}

return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

override suspend fun runAction(action: Action, ctx: TriggerExecutionContext, monitorCtx: MonitorRunnerExecutionContext, dryrun: Boolean): ActionRunResult {
override suspend fun runAction(
action: Action,
ctx: TriggerExecutionContext,
monitorCtx: MonitorRunnerExecutionContext,
dryrun: Boolean
): ActionRunResult {
return try {
val actionOutput = mutableMapOf<String, String>()
actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null)
MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx)
else ""
actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplate(action.messageTemplate, ctx)
if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
withContext(Dispatchers.IO) {
val destination = AlertingConfigAccessor.getDestinationInfo(monitorCtx.client!!, monitorCtx.xContentRegistry!!, action.destinationId)
val destination = AlertingConfigAccessor.getDestinationInfo(
monitorCtx.client!!,
monitorCtx.xContentRegistry!!,
action.destinationId
)
if (!destination.isAllowed(monitorCtx.allowList)) {
throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}")
}
Expand Down Expand Up @@ -323,8 +349,8 @@ object BucketLevelMonitorRunner : MonitorRunner {
if (totalActionableAlertCount > monitorCtx.maxActionableAlertCount) {
logger.debug(
"The total actionable alerts for trigger [$triggerId] in monitor [$monitorId] is [$totalActionableAlertCount] " +
"which exceeds the maximum of [$(monitorCtx.maxActionableAlertCount)]. Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] " +
"for action execution."
"which exceeds the maximum of [${monitorCtx.maxActionableAlertCount}]. " +
"Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] for action execution."
)
return true
}
Expand Down
Loading

0 comments on commit ebdecc6

Please sign in to comment.