diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 983f9b472..66c369033 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -6,8 +6,9 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest @@ -16,6 +17,7 @@ import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext @@ -23,7 +25,6 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.util.getCombinedTriggerRunResult -import org.opensearch.client.Client import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder @@ -39,6 +40,7 @@ import org.opensearch.commons.alerting.model.action.PerExecutionActionScope import org.opensearch.commons.alerting.util.string import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptType import org.opensearch.script.TemplateScript @@ -380,12 +382,14 @@ object BucketLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, shouldCreateFinding: Boolean ): List { - val docIdsByIndexName: MutableMap> = mutableMapOf() + val docIdsByIndexName: MutableMap> = mutableMapOf() for (hit in searchResponse.hits.hits) { val ids = docIdsByIndexName.getOrDefault(hit.index, mutableListOf()) + ids.add(hit.id) docIdsByIndexName[hit.index] = ids } val findings = mutableListOf() + var requestsToRetry: MutableList = mutableListOf() docIdsByIndexName.entries.forEach { it -> run { val finding = Finding( @@ -406,15 +410,27 @@ object BucketLevelMonitorRunner : MonitorRunner() { .source(findingStr, XContentType.JSON) .id(finding.id) .routing(finding.id) - - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.index(indexRequest, it) - } + requestsToRetry.add(indexRequest) } - findings.add(finding.id) } } + if (requestsToRetry.isEmpty()) return listOf() + monitorCtx.retryPolicy!!.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { + val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.bulk(bulkRequest, it) } + val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } + requestsToRetry = mutableListOf() + val findingsBeingRetried = mutableListOf() + bulkResponse.items.forEach { item -> + if (item.isFailed) { + if (item.status() == RestStatus.TOO_MANY_REQUESTS) { + requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest) + findingsBeingRetried.add(findingsBeingRetried[item.itemId]) + } + } + } + } return findings } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 647132a1a..924b5d483 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -748,7 +748,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, - "test_field" : { "type" : "keyword" } + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } } """.trimIndent() ) @@ -835,7 +836,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val testDoc = """ { "test_strict_date_time": "$testTime", - "test_field": "$value" + "test_field": "$value", + "number": "$i" } """.trimIndent() // Indexing documents with deterministic doc id to allow for easy selected deletion during testing diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 662f74880..8b99ec5dd 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -1274,7 +1274,6 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Alerts not saved", 2, currentAlerts.size) currentAlerts.forEach { verifyAlert(it, monitor, ACTIVE) - Assert.assertEquals("expected no findings for alert", it.findingIds.size, 0) } // Acknowledge one of the Alerts @@ -1386,6 +1385,78 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { currentAlerts.forEach { alert -> Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1) } + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) + } + + fun `test bucket-level monitor with findings enabled for multiple group by fields`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field"), + TermsValuesSourceBuilder("number").field("number") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + // For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy + // so that the assertions done later in this test don't fail. + // The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope). + val actions = randomActionsForBucketLevelTrigger(min = 1).map { + if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) { + it.copy( + actionExecutionPolicy = ActionExecutionPolicy( + PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED)) + ) + ) + } else { + it + } + } + var trigger = randomBucketLevelTrigger(actions = actions) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources(findingsEnabled = true) + ) + ) + executeMonitor(monitor.id) + + // Check created Alerts + var currentAlerts = searchAlerts(monitor) + assertEquals("Alerts not saved", 2, currentAlerts.size) + currentAlerts.forEach { alert -> + Assert.assertEquals("expected findings for alert", alert.findingIds.size, 0) + } + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 0, findings.size) } @Suppress("UNCHECKED_CAST")