Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bucket level monitor findings to support term aggs in query #666

Merged
merged 1 commit into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID
Expand All @@ -71,6 +72,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
if (monitor.dataSources.findingsEnabled == true) {
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources)
}
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
Expand Down Expand Up @@ -142,15 +146,19 @@ object BucketLevelMonitorRunner : MonitorRunner() {
*/
if (triggerResults[trigger.id]?.error != null) continue
val findings =
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) createFindings(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
)
else emptyList()
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) {
logger.debug("Creating bucket level findings")
createFindings(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
)
} else {
emptyList()
}
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
Expand Down Expand Up @@ -334,15 +342,30 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val bucketValues: Set<String> = triggerResult.aggregationResultBuckets.keys
val query = input.query
var fieldName = ""
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings

for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
val sources = (aggFactory as CompositeAggregationBuilder).sources()
for (source in sources) {
if (grouByFields > 0) {
when (aggFactory) {
is CompositeAggregationBuilder -> {
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
val sources = aggFactory.sources()
for (source in sources) {
if (grouByFields > 0) {
logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}")
return listOf()
}
grouByFields++
fieldName = source.field()
}
}
is TermsAggregationBuilder -> {
fieldName = aggFactory.field()
}
else -> {
logger.error(
"Bucket level monitor findings supported only for composite and term aggs. Found [{${aggFactory.type}}]"
)
return listOf()
}
grouByFields++
fieldName = source.field()
}
}
if (fieldName != "") {
Expand Down Expand Up @@ -370,6 +393,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
}
}
Expand Down Expand Up @@ -403,8 +428,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
logger.debug("Bucket level monitor ${monitor.id} Findings: $findingStr")
if (shouldCreateFinding) {
logger.debug("Saving bucket level monitor findings for monitor ${monitor.id}")
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.net.URLEncoder
import java.time.Instant
Expand Down Expand Up @@ -1322,7 +1323,73 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
)
}

fun `test bucket-level monitor with findings enabled`() {
fun `test bucket-level monitor with findings enabled on term agg`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
listOf(
"test_value_1",
"test_value_2"
)
)

val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val termAgg = TermsAggregationBuilder("test_field").field("test_field")
val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(termAgg))
val triggerScript = """
params.docCount > 0
""".trimIndent()

// For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy
// so that the assertions done later in this test don't fail.
// The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope).
val actions = randomActionsForBucketLevelTrigger(min = 1).map {
if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) {
it.copy(
actionExecutionPolicy = ActionExecutionPolicy(
PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED))
)
)
} else {
it
}
}
var trigger = randomBucketLevelTrigger(actions = actions)
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "test_field",
filter = null
)
)
val monitor = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(findingsEnabled = true)
)
)
executeMonitor(monitor.id)

// Check created Alerts
var currentAlerts = searchAlerts(monitor)
assertEquals("Alerts not saved", 2, currentAlerts.size)
currentAlerts.forEach { alert ->
Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1)
}
val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
}

fun `test bucket-level monitor with findings enabled on composite agg`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
Expand Down