Skip to content

Commit

Permalink
added tests. fixed document ids in bucket level monitor findings
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Nov 5, 2022
1 parent ca84f49 commit 43bcd9b
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
Expand All @@ -16,14 +17,14 @@ import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
Expand All @@ -39,6 +40,7 @@ import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
Expand Down Expand Up @@ -380,12 +382,14 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean
): List<String> {
val docIdsByIndexName: MutableMap<String, List<String>> = mutableMapOf()
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
val ids = docIdsByIndexName.getOrDefault(hit.index, mutableListOf())
ids.add(hit.id)
docIdsByIndexName[hit.index] = ids
}
val findings = mutableListOf<String>()
var requestsToRetry: MutableList<IndexRequest> = mutableListOf()
docIdsByIndexName.entries.forEach { it ->
run {
val finding = Finding(
Expand All @@ -406,15 +410,27 @@ object BucketLevelMonitorRunner : MonitorRunner() {
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)

monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
monitorCtx.client!!.index(indexRequest, it)
}
requestsToRetry.add(indexRequest)
}

findings.add(finding.id)
}
}
if (requestsToRetry.isEmpty()) return listOf()
monitorCtx.retryPolicy!!.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = mutableListOf()
val findingsBeingRetried = mutableListOf<Alert>()
bulkResponse.items.forEach { item ->
if (item.isFailed) {
if (item.status() == RestStatus.TOO_MANY_REQUESTS) {
requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest)
findingsBeingRetried.add(findingsBeingRetried[item.itemId])
}
}
}
}
return findings
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
"""
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" }
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent()
)
Expand Down Expand Up @@ -835,7 +836,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value"
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,6 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
assertEquals("Alerts not saved", 2, currentAlerts.size)
currentAlerts.forEach {
verifyAlert(it, monitor, ACTIVE)
Assert.assertEquals("expected no findings for alert", it.findingIds.size, 0)
}

// Acknowledge one of the Alerts
Expand Down Expand Up @@ -1386,6 +1385,78 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
currentAlerts.forEach { alert ->
Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1)
}
val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
}

fun `test bucket-level monitor with findings enabled for multiple group by fields`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
listOf(
"test_value_1",
"test_value_2"
)
)

val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field"),
TermsValuesSourceBuilder("number").field("number")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
val triggerScript = """
params.docCount > 0
""".trimIndent()

// For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy
// so that the assertions done later in this test don't fail.
// The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope).
val actions = randomActionsForBucketLevelTrigger(min = 1).map {
if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) {
it.copy(
actionExecutionPolicy = ActionExecutionPolicy(
PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED))
)
)
} else {
it
}
}
var trigger = randomBucketLevelTrigger(actions = actions)
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null
)
)
val monitor = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(findingsEnabled = true)
)
)
executeMonitor(monitor.id)

// Check created Alerts
var currentAlerts = searchAlerts(monitor)
assertEquals("Alerts not saved", 2, currentAlerts.size)
currentAlerts.forEach { alert ->
Assert.assertEquals("expected findings for alert", alert.findingIds.size, 0)
}
val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 0, findings.size)
}

@Suppress("UNCHECKED_CAST")
Expand Down

0 comments on commit 43bcd9b

Please sign in to comment.