Skip to content

Commit

Permalink
added handling of multi-term agg in bucketlevel monitors
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz committed Jun 18, 2023
1 parent 32c0bb0 commit 86553bf
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class TriggerService(val scriptService: ScriptService) {
val keyField = Aggregation.CommonFields.KEY.preferredName
val keyValuesList = mutableListOf<String>()
when {
bucket[keyField] is List<*> && bucket.containsKey(Aggregation.CommonFields.KEY_AS_STRING.preferredName) ->
keyValuesList.add(bucket[Aggregation.CommonFields.KEY_AS_STRING.preferredName] as String)
bucket[keyField] is String -> keyValuesList.add(bucket[keyField] as String)
// In the case where the key field is an Int
bucket[keyField] is Int -> keyValuesList.add(bucket[keyField].toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.terms.MultiTermsAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig
import org.opensearch.search.builder.SearchSourceBuilder
import java.net.URLEncoder
import java.time.Instant
Expand Down Expand Up @@ -1149,7 +1152,8 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
TermsValuesSourceBuilder("test_field").field("test_field"),
TermsValuesSourceBuilder("number").field("number")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
Expand Down Expand Up @@ -1179,6 +1183,81 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, buckets.size)
}

fun `test execute bucket-level monitor returns search result with multi term agg`() {
val index = "test_index_1234"
indexDoc(
index,
"1",
"""{"user_id": "1",
"ip_addr": "12345678",
"user_agent": "chrome"
}
""".trimIndent()
)
indexDoc(
index,
"2",
"""{"user_id": "1",
"ip_addr": "12345678",
"user_agent": "chrome"
}
""".trimIndent()
)
indexDoc(
index,
"3",
"""{"user_id": "2",
"ip_addr": "3443534",
"user_agent": "chrome"
}
""".trimIndent()
)

val triggerScript = """
params.docCount > 0
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("_value" to "distinct_user_count", "docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "hot",
filter = null
)
)

val m = randomBucketLevelMonitor(
triggers = listOf(trigger),
inputs = listOf(
SearchInput(
listOf(index),
SearchSourceBuilder().aggregation(
MultiTermsAggregationBuilder("hot")
.terms(
listOf(
MultiTermsValuesSourceConfig.Builder().setFieldName("ip_addr.keyword").build(),
MultiTermsValuesSourceConfig.Builder().setFieldName("user_agent.keyword").build()
)
)
.subAggregation(CardinalityAggregationBuilder("distinct_user_count").field("user_id.keyword"))
)
)
)
)
val monitor = createMonitor(m)
val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR)
val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("hot")?.get("buckets") as List<Map<String, Any>>
assertEquals("Incorrect search result", 2, buckets.size)
}

fun `test bucket-level monitor alert creation and completion`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
Expand Down

0 comments on commit 86553bf

Please sign in to comment.