Skip to content

Commit

Permalink
Various bug fixes pertaining to throttling on PER_ALERT, saving COMPL…
Browse files Browse the repository at this point in the history
…ETED Alerts and rewriting input query for Bucket-Level Monitors

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi committed Sep 1, 2021
1 parent 6f7afa9 commit 4544ac9
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, xContentRegistry))
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerConsumers()
Expand Down
17 changes: 15 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.client.Client
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
Expand All @@ -37,6 +40,7 @@ import java.time.Instant
class InputService(
val client: Client,
val scriptService: ScriptService,
val namedWriteableRegistry: NamedWriteableRegistry,
val xContentRegistry: NamedXContentRegistry
) {

Expand All @@ -62,11 +66,13 @@ class InputService(
"period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli()
)
AggregationQueryRewriter.rewriteQuery(input.query, prevResult, monitor.triggers)
// Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly
// which causes a strange bug where the rewritten query persists on the Monitor across executions
val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
rewrittenQuery.toString(), searchParams
),
TemplateScript.CONTEXT
)
Expand Down Expand Up @@ -97,6 +103,13 @@ class InputService(
}
}

private fun deepCopyQuery(query: SearchSourceBuilder): SearchSourceBuilder {
val out = BytesStreamOutput()
query.writeTo(out)
val sin = NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)
return SearchSourceBuilder(sin)
}

/**
* We moved anomaly result index to system index list. So common user could not directly query
* this index any more. This method will stash current thread context to pass security check.
Expand Down
27 changes: 22 additions & 5 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

for (trigger in monitor.triggers) {
val alertsToUpdate = mutableSetOf<Alert>()
val completedAlertsToUpdate = mutableSetOf<Alert>()
// Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them.
// New Alerts are ignored since they cannot be acknowledged yet.
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)
Expand All @@ -470,6 +471,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

// Adding all the COMPLETED Alerts to a separate set and removing them if they get added
// to alertsToUpdate to ensure the Alert doc is updated at the end in either case
completedAlertsToUpdate.addAll(completedAlerts)

// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
Expand All @@ -482,12 +487,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
)
for (action in trigger.actions) {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) {
val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionFrequency.actionableAlerts) {
val perAlertActionScope = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
if (isBucketLevelTriggerActionThrottled(action, alert)) continue

val actionCtx = getActionContextForAlertCategory(
alertCategory, alert, triggerCtx, monitorOrTriggerError
)
Expand All @@ -497,9 +500,18 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf()
}

val actionResult = runAction(action, actionCtx, dryrun)
// Keeping the throttled response separate from runAction for now since
// throttling is not supported for PER_EXECUTION
val actionResult = if (isBucketLevelTriggerActionThrottled(action, alert)) {
ActionRunResult(action.id, action.name, mapOf(), true, null, null)
} else {
runAction(action, actionCtx, dryrun)
}
triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
alertsToUpdate.add(alert)
// Remove the alert from completedAlertsToUpdate in case it is present there since
// its update will be handled in the alertsToUpdate batch
completedAlertsToUpdate.remove(alert)
}
}
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) {
Expand Down Expand Up @@ -527,6 +539,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
alertsToUpdate.add(alert)
// Remove the alert from completedAlertsToUpdate in case it is present there since
// its update will be handled in the alertsToUpdate batch
completedAlertsToUpdate.remove(alert)
}
}
}
Expand All @@ -548,6 +563,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them.
if (!dryrun && monitor.id != Monitor.NO_ID) {
alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
// Save any COMPLETED Alerts that were not covered in updatedAlerts
alertService.saveAlerts(completedAlertsToUpdate.toList(), retryPolicy, allowUpdatingAcknowledgedAlert = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AggregationQueryRewriter {
* Add the bucket selector conditions for each trigger in input query. It also adds afterKeys from previous result
* for each trigger.
*/
fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List<Trigger>) {
fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List<Trigger>): SearchSourceBuilder {
triggers.forEach { trigger ->
if (trigger is BucketLevelTrigger) {
// add bucket selector pipeline aggregation for each trigger in query
Expand Down Expand Up @@ -65,6 +65,8 @@ class AggregationQueryRewriter {
}
}
}

return query
}

/**
Expand Down
Loading

0 comments on commit 4544ac9

Please sign in to comment.