From cc4bfa43b54cc87efad0b82aeba86579938d9d34 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 21 Oct 2024 02:43:25 -0700 Subject: [PATCH] fix tests to verify aggregation query on alias optimziation with indices being skipped and not skipped Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/InputService.kt | 7 +- .../alerting/AlertingRestTestCase.kt | 19 ++++++ .../alerting/MonitorRunnerServiceIT.kt | 66 +++++++++++++------ 3 files changed, 69 insertions(+), 23 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 2d7967076..1a6482c79 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -305,8 +305,10 @@ class InputService( resolvedIndexes.add(indexMetadata.index.name) includePrevious = false // No need to include previous anymore } else if ( - includePrevious && i > 0 && sortedIndices[i - 1].creationDate < - resolveStartTimeOfQueryTimeRange.toEpochMilli() + includePrevious && ( + i == sortedIndices.lastIndex || + sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli() + ) ) { // Include the index immediately before the timestamp resolvedIndexes.add(indexMetadata.index.name) @@ -314,6 +316,7 @@ class InputService( } } } else { + // add alias without optimizing for resolve indices resolvedIndexes.add(it) } } else { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 806d4f5b3..37190f4b1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1261,6 +1261,25 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } } + protected fun insertSampleTimeSerializedDataWithTime( + index: String, + data: List, + time: ZonedDateTime? = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS), + ) { + data.forEachIndexed { i, value -> + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + protected fun deleteDataWithDocIds(index: String, docIds: List) { docIds.forEach { deleteDoc(index, it) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index ec166a535..390fc80f3 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -1186,16 +1186,10 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } - fun `test execute bucket-level monitor with alias optimization - indices not skipped`() { - val testIndex = createTestIndex() - insertSampleTimeSerializedDataCurrentTime( - testIndex, - listOf( - "test_value_3", - "test_value_4", // adding duplicate to verify aggregation - "test_value_5" - ) - ) + fun `test execute bucket-level monitor with alias optimization - indices not skipped from query`() { + val skipIndex = createTestIndex("to_skip_index") + val previousIndex = createTestIndex("to_include_index") + val indexMapping = """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, @@ -1213,7 +1207,24 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) - addIndexToAlias(testIndex, aliasName) + insertSampleTimeSerializedDataWithTime( + previousIndex, + listOf( + "test_value_3", + "test_value_4", + "test_value_5" + ) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ) + ) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10s") .lte("{{period_end}}") @@ -1246,18 +1257,30 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - assertEquals("Incorrect search result", 5, buckets.size) + Assert.assertEquals(buckets.size, 8) } fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() { - val testIndex = createTestIndex() - insertSampleTimeSerializedDataCurrentTime( - testIndex, + val skipIndex = createTestIndex("to_skip_index") + Thread.sleep(10000) + val previousIndex = createTestIndex("to_include_index") + insertSampleTimeSerializedDataWithTime( + previousIndex, listOf( - "test_value_1", - "test_value_1", // adding duplicate to verify aggregation - "test_value_2" - ) + "test_value_3", + "test_value_4", + "test_value_5" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) ) Thread.sleep(10000) val indexMapping = """ @@ -1277,7 +1300,8 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) - addIndexToAlias(testIndex, aliasName) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10s") .lte("{{period_end}}") @@ -1310,7 +1334,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - Assert.assertTrue(buckets.size <= 2) + Assert.assertTrue(buckets.size <= 5) } fun `test execute bucket-level monitor returns search result with multi term agg`() {