Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various bug fixes for Bucket-Level Alerting #164

Merged
merged 1 commit into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, xContentRegistry))
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerConsumers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.client.Client
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
Expand All @@ -37,6 +40,7 @@ import java.time.Instant
class InputService(
val client: Client,
val scriptService: ScriptService,
val namedWriteableRegistry: NamedWriteableRegistry,
val xContentRegistry: NamedXContentRegistry
) {

Expand All @@ -62,11 +66,13 @@ class InputService(
"period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli()
)
AggregationQueryRewriter.rewriteQuery(input.query, prevResult, monitor.triggers)
// Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly
// which causes a strange bug where the rewritten query persists on the Monitor across executions
val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
rewrittenQuery.toString(), searchParams
),
TemplateScript.CONTEXT
)
Expand Down Expand Up @@ -97,6 +103,13 @@ class InputService(
}
}

private fun deepCopyQuery(query: SearchSourceBuilder): SearchSourceBuilder {
val out = BytesStreamOutput()
query.writeTo(out)
val sin = NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)
return SearchSourceBuilder(sin)
}

/**
* We moved anomaly result index to system index list. So common user could not directly query
* this index any more. This method will stash current thread context to pass security check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

for (trigger in monitor.triggers) {
val alertsToUpdate = mutableSetOf<Alert>()
val completedAlertsToUpdate = mutableSetOf<Alert>()
// Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them.
// New Alerts are ignored since they cannot be acknowledged yet.
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)
Expand All @@ -470,6 +471,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

// Adding all the COMPLETED Alerts to a separate set and removing them if they get added
// to alertsToUpdate to ensure the Alert doc is updated at the end in either case
completedAlertsToUpdate.addAll(completedAlerts)

// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
Expand All @@ -482,12 +487,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
)
for (action in trigger.actions) {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) {
val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionFrequency.actionableAlerts) {
val perAlertActionScope = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
if (isBucketLevelTriggerActionThrottled(action, alert)) continue

val actionCtx = getActionContextForAlertCategory(
alertCategory, alert, triggerCtx, monitorOrTriggerError
)
Expand All @@ -497,9 +500,18 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf()
}

val actionResult = runAction(action, actionCtx, dryrun)
// Keeping the throttled response separate from runAction for now since
// throttling is not supported for PER_EXECUTION
val actionResult = if (isBucketLevelTriggerActionThrottled(action, alert)) {
ActionRunResult(action.id, action.name, mapOf(), true, null, null)
} else {
runAction(action, actionCtx, dryrun)
}
triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
alertsToUpdate.add(alert)
// Remove the alert from completedAlertsToUpdate in case it is present there since
// its update will be handled in the alertsToUpdate batch
completedAlertsToUpdate.remove(alert)
}
}
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) {
Expand Down Expand Up @@ -527,6 +539,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
alertsToUpdate.add(alert)
// Remove the alert from completedAlertsToUpdate in case it is present there since
// its update will be handled in the alertsToUpdate batch
completedAlertsToUpdate.remove(alert)
}
}
}
Expand All @@ -548,6 +563,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them.
if (!dryrun && monitor.id != Monitor.NO_ID) {
alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
// Save any COMPLETED Alerts that were not covered in updatedAlerts
alertService.saveAlerts(completedAlertsToUpdate.toList(), retryPolicy, allowUpdatingAcknowledgedAlert = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AggregationQueryRewriter {
* Add the bucket selector conditions for each trigger in input query. It also adds afterKeys from previous result
* for each trigger.
*/
fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List<Trigger>) {
fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List<Trigger>): SearchSourceBuilder {
triggers.forEach { trigger ->
if (trigger is BucketLevelTrigger) {
// add bucket selector pipeline aggregation for each trigger in query
Expand Down Expand Up @@ -65,6 +65,8 @@ class AggregationQueryRewriter {
}
}
}

return query
}

/**
Expand Down
Loading