diff --git a/detekt.yml b/detekt.yml index 77c5afe01..8ed992998 100644 --- a/detekt.yml +++ b/detekt.yml @@ -22,4 +22,6 @@ complexity: LongMethod: excludes: ['**/test/**'] LongParameterList: - excludes: ['**/test/**'] \ No newline at end of file + excludes: ['**/test/**'] + NestedBlockDepth: + threshold: 5 \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt index ac077a707..55db96c21 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt @@ -116,8 +116,9 @@ class RollupIndexer( it.aggregations.forEach { when (it) { is InternalSum -> aggResults[it.name] = it.value - is InternalMax -> aggResults[it.name] = it.value - is InternalMin -> aggResults[it.name] = it.value + // TODO: Need to redo the logic in corresponding doXContentBody of InternalMax and InternalMin + is InternalMax -> if (it.value.isInfinite()) aggResults[it.name] = null else aggResults[it.name] = it.value + is InternalMin -> if (it.value.isInfinite()) aggResults[it.name] = null else aggResults[it.name] = it.value is InternalValueCount -> aggResults[it.name] = it.value is InternalAvg -> aggResults[it.name] = it.value else -> error("Found aggregation in composite result that is not supported [${it.type} - ${it.name}]") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 7ca40adab..733d9faf7 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -153,6 +153,11 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText()) } + protected fun generateMessageLogsData(index: String = "message-logs") { + createIndex(index, Settings.EMPTY, """"properties": {"message":{"properties":{"bytes_in":{"type":"long"},"bytes_out":{"type":"long"},"plugin":{"eager_global_ordinals":true,"ignore_above":10000,"type":"keyword"},"timestamp_received":{"type":"date"}}}}""") + insertSampleBulkData(index, javaClass.classLoader.getResource("data/message_logs.ndjson").readText()) + } + @Suppress("UNCHECKED_CAST") protected fun extractFailuresFromSearchResponse(searchResponse: Response): List?>? { val shards = searchResponse.asMap()["_shards"] as Map>> diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 17028ccb7..529f166f4 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -1319,6 +1319,78 @@ class RollupRunnerIT : RollupRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + fun `test rollup with max metric when metric property not present`() { + val sourceIdxTestName = "source_idx_test_max" + val targetIdxTestName = "target_idx_test_max" + val propertyName = "message.bytes_in" + val maxMetricName = "min_message_bytes_in" + + generateMessageLogsData(sourceIdxTestName) + val rollup = Rollup( + id = "rollup_test_max", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic stats test", + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + metadataID = null, + roles = emptyList(), + pageSize = 100, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "message.timestamp_received", targetField = "message.timestamp_received", fixedInterval = "10m"), + Terms("message.plugin", "message.plugin") + ), + metrics = listOf( + RollupMetrics(sourceField = propertyName, targetField = propertyName, metrics = listOf(Max())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) } + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + + // Term query + val req = """ + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "$maxMetricName": { + "max": { + "field": "$propertyName" + } + } + } + } + """.trimIndent() + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + var rollupRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + var rawAggRes = rawRes.asMap()["aggregations"] as Map> + var rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same max results", + rawAggRes.getValue(maxMetricName)["value"], + rollupAggRes.getValue(maxMetricName)["value"] + ) + } + } + // TODO: Test scenarios: // - Source index deleted after first execution // * If this is with a source index pattern and the underlying indices are recreated but with different data diff --git a/src/test/resources/data/message_logs.ndjson b/src/test/resources/data/message_logs.ndjson new file mode 100644 index 000000000..06b92003e --- /dev/null +++ b/src/test/resources/data/message_logs.ndjson @@ -0,0 +1,4 @@ +{"create":{}} +{"message":{"bytes_out":4256,"plugin":"AlienVault NIDS","timestamp_received":1689786716020}} +{"create":{}} +{"message":{"bytes_out":4526,"plugin":"AlienVault NIDS","timestamp_received":1689886716020}}