From 4544ac92699af0571db123d68ee76b6c5c6efcc9 Mon Sep 17 00:00:00 2001 From: Mohammad Qureshi Date: Thu, 12 Aug 2021 17:47:59 -0700 Subject: [PATCH] Various bug fixes pertaining to throttling on PER_ALERT, saving COMPLETED Alerts and rewriting input query for Bucket-Level Monitors Signed-off-by: Mohammad Qureshi --- .../org/opensearch/alerting/AlertingPlugin.kt | 2 +- .../org/opensearch/alerting/InputService.kt | 17 +- .../org/opensearch/alerting/MonitorRunner.kt | 27 ++- .../alerting/util/AggregationQueryRewriter.kt | 4 +- .../opensearch/alerting/MonitorRunnerIT.kt | 228 +++++++++++++++++- .../org/opensearch/alerting/TestHelpers.kt | 4 +- 6 files changed, 264 insertions(+), 18 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index e18bc777f..6c74c725e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -258,7 +258,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerSettings(settings) .registerThreadPool(threadPool) .registerAlertIndices(alertIndices) - .registerInputService(InputService(client, scriptService, xContentRegistry)) + .registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry)) .registerTriggerService(TriggerService(scriptService)) .registerAlertService(AlertService(client, xContentRegistry, alertIndices)) .registerConsumers() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index eba142cea..a389b8457 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -23,6 +23,9 @@ import org.opensearch.alerting.model.TriggerAfterKey import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.client.Client +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput +import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentType @@ -37,6 +40,7 @@ import java.time.Instant class InputService( val client: Client, val scriptService: ScriptService, + val namedWriteableRegistry: NamedWriteableRegistry, val xContentRegistry: NamedXContentRegistry ) { @@ -62,11 +66,13 @@ class InputService( "period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli() ) - AggregationQueryRewriter.rewriteQuery(input.query, prevResult, monitor.triggers) + // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly + // which causes a strange bug where the rewritten query persists on the Monitor across executions + val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers) val searchSource = scriptService.compile( Script( ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, - input.query.toString(), searchParams + rewrittenQuery.toString(), searchParams ), TemplateScript.CONTEXT ) @@ -97,6 +103,13 @@ class InputService( } } + private fun deepCopyQuery(query: SearchSourceBuilder): SearchSourceBuilder { + val out = BytesStreamOutput() + query.writeTo(out) + val sin = NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry) + return SearchSourceBuilder(sin) + } + /** * We moved anomaly result index to system index list. So common user could not directly query * this index any more. This method will stash current thread context to pass security check. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 03c24a9d5..c8cf9fa04 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -460,6 +460,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { for (trigger in monitor.triggers) { val alertsToUpdate = mutableSetOf() + val completedAlertsToUpdate = mutableSetOf() // Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them. // New Alerts are ignored since they cannot be acknowledged yet. val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED) @@ -470,6 +471,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf() val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf() + // Adding all the COMPLETED Alerts to a separate set and removing them if they get added + // to alertsToUpdate to ensure the Alert doc is updated at the end in either case + completedAlertsToUpdate.addAll(completedAlerts) + // All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop val triggerCtx = triggerContexts[trigger.id]!! val triggerResult = triggerResults[trigger.id]!! @@ -482,12 +487,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { ) for (action in trigger.actions) { if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) { - val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope - for (alertCategory in perAlertActionFrequency.actionableAlerts) { + val perAlertActionScope = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope + for (alertCategory in perAlertActionScope.actionableAlerts) { val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() for (alert in alertsToExecuteActionsFor) { - if (isBucketLevelTriggerActionThrottled(action, alert)) continue - val actionCtx = getActionContextForAlertCategory( alertCategory, alert, triggerCtx, monitorOrTriggerError ) @@ -497,9 +500,18 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf() } - val actionResult = runAction(action, actionCtx, dryrun) + // Keeping the throttled response separate from runAction for now since + // throttling is not supported for PER_EXECUTION + val actionResult = if (isBucketLevelTriggerActionThrottled(action, alert)) { + ActionRunResult(action.id, action.name, mapOf(), true, null, null) + } else { + runAction(action, actionCtx, dryrun) + } triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult) alertsToUpdate.add(alert) + // Remove the alert from completedAlertsToUpdate in case it is present there since + // its update will be handled in the alertsToUpdate batch + completedAlertsToUpdate.remove(alert) } } } else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) { @@ -527,6 +539,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { } triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult) alertsToUpdate.add(alert) + // Remove the alert from completedAlertsToUpdate in case it is present there since + // its update will be handled in the alertsToUpdate batch + completedAlertsToUpdate.remove(alert) } } } @@ -548,6 +563,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { // ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them. if (!dryrun && monitor.id != Monitor.NO_ID) { alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false) + // Save any COMPLETED Alerts that were not covered in updatedAlerts + alertService.saveAlerts(completedAlertsToUpdate.toList(), retryPolicy, allowUpdatingAcknowledgedAlert = false) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt index c1f40ea15..22b787836 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt @@ -31,7 +31,7 @@ class AggregationQueryRewriter { * Add the bucket selector conditions for each trigger in input query. It also adds afterKeys from previous result * for each trigger. */ - fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List) { + fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List): SearchSourceBuilder { triggers.forEach { trigger -> if (trigger is BucketLevelTrigger) { // add bucket selector pipeline aggregation for each trigger in query @@ -65,6 +65,8 @@ class AggregationQueryRewriter { } } } + + return query } /** diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index 59ebf6eb5..e8d6c10dc 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -40,6 +40,8 @@ import org.opensearch.alerting.model.Alert.State.COMPLETED import org.opensearch.alerting.model.Alert.State.ERROR import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.action.ActionExecutionPolicy +import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.model.action.PerAlertActionScope import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.CustomWebhook @@ -1140,10 +1142,10 @@ class MonitorRunnerIT : AlertingRestTestCase() { // Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was currentAlerts = searchAlerts(monitor) - val currentAcknowledgedAlert = currentAlerts.single { it.state == ACKNOWLEDGED } - val currentActiveAlert = currentAlerts.single { it.state == ACTIVE } - assertEquals("Acknowledged alert was updated", acknowledgedAlert.lastNotificationTime, currentAcknowledgedAlert.lastNotificationTime) - assertTrue("Active alert was not updated", currentActiveAlert.lastNotificationTime!! > activeAlert.lastNotificationTime) + val acknowledgedAlert2 = currentAlerts.single { it.state == ACKNOWLEDGED } + val activeAlert2 = currentAlerts.single { it.state == ACTIVE } + assertEquals("Acknowledged alert was updated", acknowledgedAlert.lastNotificationTime, acknowledgedAlert2.lastNotificationTime) + assertTrue("Active alert was not updated", activeAlert2.lastNotificationTime!! > activeAlert.lastNotificationTime) // Remove data so that both Alerts are moved into completed deleteDataWithDocIds( @@ -1162,13 +1164,18 @@ class MonitorRunnerIT : AlertingRestTestCase() { assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size) val previouslyAcknowledgedAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") } val previouslyActiveAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_2") } + // Note: Given the randomization of the Actions and ActionExecutionPolicy for the Bucket-Level Monitor + // there is a very small chance we could end up with COMPLETED Alerts that never had lastNotificationTime updated + // (This would occur if the Trigger contained Actions with ActionExecutionScope of PER_ALERT that all somehow excluded the + // same Alert categories being tested in this test) + // In such a rare case, the tests can just be rerun assertTrue( "Previously acknowledged alert was not updated when it moved to completed", - previouslyAcknowledgedAlert.lastNotificationTime!! > currentAcknowledgedAlert.lastNotificationTime + previouslyAcknowledgedAlert.lastNotificationTime!! > acknowledgedAlert2.lastNotificationTime ) assertTrue( "Previously active alert was not updated when it moved to completed", - previouslyActiveAlert.lastNotificationTime!! > currentActiveAlert.lastNotificationTime + previouslyActiveAlert.lastNotificationTime!! > activeAlert2.lastNotificationTime ) } @@ -1251,7 +1258,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { } @Suppress("UNCHECKED_CAST") - fun `test bucket-level monitor with per execution action frequency`() { + fun `test bucket-level monitor with per execution action scope`() { val testIndex = createTestIndex() insertSampleTimeSerializedData( testIndex, @@ -1320,6 +1327,195 @@ class MonitorRunnerIT : AlertingRestTestCase() { } } + fun `test bucket-level monitor with per alert action scope saves completed alerts even if not actionable`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_1", + "test_value_2", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + val action = randomAction( + template = randomTemplateScript("Hello {{ctx.monitor.name}}"), + destinationId = createDestination().id, + actionExecutionPolicy = ActionExecutionPolicy(null, PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW))) + ) + var trigger = randomBucketLevelTrigger(actions = listOf(action)) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + executeMonitor(monitor.id) + + // Check created Alerts + var currentAlerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, currentAlerts.size) + currentAlerts.forEach { + verifyAlert(it, monitor, ACTIVE) + } + + // Remove data so that both Alerts are moved into completed + deleteDataWithDocIds( + testIndex, + listOf( + "1", // test_value_1 + "2", // test_value_1 + "3", // test_value_2 + "4" // test_value_2 + ) + ) + + // Execute Monitor and check that both Alerts were moved to COMPLETED + executeMonitor(monitor.id) + currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) + val completedAlerts = currentAlerts.filter { it.state == COMPLETED } + assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size) + } + + @Suppress("UNCHECKED_CAST") + fun `test bucket-level monitor throttling with per alert action scope`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + val actionThrottleEnabled = randomAction( + template = randomTemplateScript("Hello {{ctx.monitor.name}}"), + destinationId = createDestination().id, + throttleEnabled = true, + actionExecutionPolicy = ActionExecutionPolicy( + throttle = Throttle(value = 5, unit = MINUTES), + actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)) + ) + ) + val actionThrottleNotEnabled = randomAction( + template = randomTemplateScript("Hello {{ctx.monitor.name}}"), + destinationId = createDestination().id, + throttleEnabled = false, + actionExecutionPolicy = ActionExecutionPolicy( + throttle = Throttle(value = 5, unit = MINUTES), + actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)) + ) + ) + val actions = listOf(actionThrottleEnabled, actionThrottleNotEnabled) + var trigger = randomBucketLevelTrigger(actions = actions) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val monitorRunResultNotThrottled = entityAsMap(executeMonitor(monitor.id)) + verifyActionThrottleResultsForBucketLevelMonitor( + monitorRunResult = monitorRunResultNotThrottled, + expectedEvents = setOf("test_value_1", "test_value_2"), + expectedActionResults = mapOf( + Pair(actionThrottleEnabled.id, false), + Pair(actionThrottleNotEnabled.id, false) + ) + ) + + val notThrottledAlerts = searchAlerts(monitor) + assertEquals("Alerts may not have been saved correctly", 2, notThrottledAlerts.size) + val previousAlertExecutionTime: MutableMap> = mutableMapOf() + notThrottledAlerts.forEach { + verifyAlert(it, monitor, ACTIVE) + val notThrottledActionResults = verifyActionExecutionResultInAlert( + it, + mutableMapOf(Pair(actionThrottleEnabled.id, 0), Pair(actionThrottleNotEnabled.id, 0)) + ) + assertEquals(notThrottledActionResults.size, 2) + // Save the lastExecutionTimes of the actions for the Alert to be compared later against + // the next Monitor execution run + previousAlertExecutionTime[it.id] = mutableMapOf() + previousAlertExecutionTime[it.id]!![actionThrottleEnabled.id] = + notThrottledActionResults[actionThrottleEnabled.id]!!.lastExecutionTime + previousAlertExecutionTime[it.id]!![actionThrottleNotEnabled.id] = + notThrottledActionResults[actionThrottleNotEnabled.id]!!.lastExecutionTime + } + + // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to + // let Action executionTime change. W/o this sleep the test can result in a false negative. + Thread.sleep(200) + val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id)) + verifyActionThrottleResultsForBucketLevelMonitor( + monitorRunResult = monitorRunResultThrottled, + expectedEvents = setOf("test_value_1", "test_value_2"), + expectedActionResults = mapOf( + Pair(actionThrottleEnabled.id, true), + Pair(actionThrottleNotEnabled.id, false) + ) + ) + + val throttledAlerts = searchAlerts(monitor) + assertEquals("Alerts may not have been saved correctly", 2, throttledAlerts.size) + throttledAlerts.forEach { + verifyAlert(it, monitor, ACTIVE) + val throttledActionResults = verifyActionExecutionResultInAlert( + it, + mutableMapOf(Pair(actionThrottleEnabled.id, 1), Pair(actionThrottleNotEnabled.id, 0)) + ) + assertEquals(throttledActionResults.size, 2) + + val prevthrottledActionLastExecutionTime = previousAlertExecutionTime[it.id]!![actionThrottleEnabled.id] + val prevNotThrottledActionLastExecutionTime = previousAlertExecutionTime[it.id]!![actionThrottleNotEnabled.id] + assertEquals( + "Last execution time of a throttled action was updated for one of the Alerts", + prevthrottledActionLastExecutionTime, + throttledActionResults[actionThrottleEnabled.id]!!.lastExecutionTime + ) + assertTrue( + "Last execution time of a non-throttled action was not updated for one of the Alerts", + throttledActionResults[actionThrottleNotEnabled.id]!!.lastExecutionTime!! > prevNotThrottledActionLastExecutionTime + ) + } + } + private fun prepareTestAnomalyResult(detectorId: String, user: User) { val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17" try { @@ -1380,6 +1576,24 @@ class MonitorRunnerIT : AlertingRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + private fun verifyActionThrottleResultsForBucketLevelMonitor( + monitorRunResult: MutableMap, + expectedEvents: Set, + expectedActionResults: Map + ) { + for (triggerResult in monitorRunResult.objectMap("trigger_results").values) { + for (alertEvent in triggerResult.objectMap("action_results")) { + assertTrue(expectedEvents.contains(alertEvent.key)) + val actionResults = alertEvent.value.values as Collection> + for (actionResult in actionResults) { + val expected = expectedActionResults[actionResult["id"]] + assertEquals(expected, actionResult["throttled"]) + } + } + } + } + private fun verifyAlert(alert: Alert, monitor: Monitor, expectedState: Alert.State = ACTIVE) { assertNotNull(alert.id) assertNotNull(alert.startTime) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index b3fe3455a..e1de5dba4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -254,7 +254,7 @@ fun randomThrottle( fun randomActionExecutionPolicy( throttle: Throttle = randomThrottle(), - actionExecutionScope: ActionExecutionScope = randomActionExecutionFrequency() + actionExecutionScope: ActionExecutionScope = randomActionExecutionScope() ): ActionExecutionPolicy { return if (actionExecutionScope is PerExecutionActionScope) { // Return null for throttle when using PerExecutionActionScope since throttling is currently not supported for it @@ -264,7 +264,7 @@ fun randomActionExecutionPolicy( } } -fun randomActionExecutionFrequency(): ActionExecutionScope { +fun randomActionExecutionScope(): ActionExecutionScope { return if (randomBoolean()) { val alertCategories = AlertCategory.values() PerAlertActionScope(actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet())