From 0d58aad4bcfa9f4f0c08e36b29fa2b583c214669 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 9 Aug 2022 14:03:36 -0700 Subject: [PATCH] Multi rollup ndx search (#453) (#455) * added support for searching multiple rollup indices with same mapping Signed-off-by: Petar Dzepina * fixed failing rollupInterceptorIT test Signed-off-by: Petar Dzepina * reverted old error messages Signed-off-by: Petar Dzepina * reverted checking for matching jobs on whole set instead of job by job; Added picking rollup job deterministic Signed-off-by: petar.dzepina * fixed sorting Signed-off-by: petar.dzepina * added ITs for multi rollup index search Signed-off-by: petardz * added ITs for multi rollup index search#2 Signed-off-by: petardz * detekt fixes Signed-off-by: petardz * changed index names and rollup job Signed-off-by: petardz * detekt fix Signed-off-by: petardz * empty commit to trigger test pipeline Signed-off-by: petardz Co-authored-by: petar.dzepina (cherry picked from commit af5b3666b8836ac7b5abef2c89f66825ac95407c) Co-authored-by: Petar Dzepina Signed-off-by: Angie Zhang --- .../IndexManagementRestTestCase.kt | 14 + .../rollup/interceptor/RollupInterceptorIT.kt | 288 ++++++++++++++++++ 2 files changed, 302 insertions(+) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 82308c85e..d22d6876d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -138,6 +138,20 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText()) } + protected fun extractFailuresFromSearchResponse(searchResponse: Response): List?>? { + val shards = searchResponse.asMap()["_shards"] as Map>> + assertNotNull(shards) + val failures = shards["failures"] + assertNotNull(failures) + return failures?.let { + val result: ArrayList?>? = ArrayList() + for (failure in it) { + result?.add((failure as Map>)["reason"]) + } + return result + } + } + companion object { internal interface IProxy { val version: String? diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index ca1fe3c36..ff13584ff 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -789,4 +789,292 @@ class RollupInterceptorIT : RollupRestTestCase() { trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"] ) } + + fun `test rollup search multiple target indices successfully`() { + val sourceIndex1 = "source_rollup_search_multi_jobs_1" + val sourceIndex2 = "source_rollup_search_multi_jobs_2" + generateNYCTaxiData(sourceIndex1) + generateNYCTaxiData(sourceIndex2) + val targetIndex1 = "target_rollup_search_multi_jobs1" + val targetIndex2 = "target_rollup_search_multi_jobs2" + val rollupHourly1 = Rollup( + id = "hourly_basic_term_query_rollup_search_multi_1", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex1, + targetIndex = targetIndex1, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollupHourly1) + + waitFor { + val rollupJob = getRollup(rollupId = rollupHourly1.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + val rollupHourly2 = Rollup( + id = "hourly_basic_term_query_rollup_search_multi_2", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex2, + targetIndex = targetIndex2, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollupHourly2) + + waitFor { + val rollupJob = getRollup(rollupId = rollupHourly2.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + val req = """ + { + "size": 0, + "query": { + "term": { "RatecodeID": 1 } + }, + "aggs": { + "sum_passenger_count": { "sum": { "field": "passenger_count" } }, + "max_passenger_count": { "max": { "field": "passenger_count" } }, + "value_count_passenger_count": { "value_count": { "field": "passenger_count" } } + } + } + """.trimIndent() + val rawRes1 = client().makeRequest("POST", "/$sourceIndex1/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes1.restStatus() == RestStatus.OK) + val rawRes2 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes2.restStatus() == RestStatus.OK) + val rollupResMulti = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupResMulti.restStatus() == RestStatus.OK) + val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map> + val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map> + val rollupAggResMulti = rollupResMulti.asMap()["aggregations"] as Map> + + // When the cluster setting to search all jobs is off, the aggregations will be the same for searching a single job as for searching both + assertEquals( + "Searching single rollup job and rollup target index did not return the same max results", + rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"] + ) + assertEquals( + "Searching single rollup job and rollup target index did not return the same sum results", + rawAgg1Res.getValue("sum_passenger_count")["value"], rollupAggResMulti.getValue("sum_passenger_count")["value"] + ) + val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int + assertEquals( + "Searching single rollup job and rollup target index did not return the same value count results", + rawAgg1Res.getValue("value_count_passenger_count")["value"], rollupAggResMulti.getValue("value_count_passenger_count")["value"] + ) + + val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double + updateSearchAllJobsClusterSetting(true) + + val rollupResAll = client().makeRequest("POST", "/$targetIndex1,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupResAll.restStatus() == RestStatus.OK) + val rollupAggResAll = rollupResAll.asMap()["aggregations"] as Map> + + // With search all jobs setting on, the sum, and value_count will now be equal to the sum of the single job search results + assertEquals( + "Searching single rollup job and rollup target index did not return the same sum results", + rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResAll.getValue("max_passenger_count")["value"] + ) + assertEquals( + "Searching rollup target index did not return the sum for all of the rollup jobs on the index", + trueAggSum, rollupAggResAll.getValue("sum_passenger_count")["value"] + ) + assertEquals( + "Searching rollup target index did not return the value count for all of the rollup jobs on the index", + trueAggCount, rollupAggResAll.getValue("value_count_passenger_count")["value"] + ) + } + + fun `test rollup search multiple target indices failed`() { + val sourceIndex1 = "source_rollup_search_multi_failed_1" + val sourceIndex2 = "source_rollup_search_multi_failed_2" + generateNYCTaxiData(sourceIndex1) + generateNYCTaxiData(sourceIndex2) + val targetIndex1 = "target_rollup_search_multi_failed_jobs1" + val targetIndex2 = "target_rollup_search_multi_failed_jobs2" + val rollupJob1 = Rollup( + id = "hourly_basic_term_query_rollup_search_failed_1", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex1, + targetIndex = targetIndex1, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("VendorID", "VendorID"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "fare_amount", targetField = "fare_amount", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "improvement_surcharge", targetField = "improvement_surcharge", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollupJob1) + + waitFor { + val rollupJob = getRollup(rollupId = rollupJob1.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + val rollupJob2 = Rollup( + id = "hourly_basic_term_query_rollup_search_failed_2", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex2, + targetIndex = targetIndex2, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_dropoff_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollupJob2) + + waitFor { + val rollupJob = getRollup(rollupId = rollupJob2.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + val req = """ + { + "size": 0, + "query": { + "term": { "RatecodeID": 1 } + }, + "aggs": { + "sum_passenger_count": { "sum": { "field": "passenger_count" } }, + "max_passenger_count": { "max": { "field": "passenger_count" } }, + "value_count_passenger_count": { "value_count": { "field": "passenger_count" } } + } + } + """.trimIndent() + // Search 1 non-rollup index and 1 rollup + val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(searchResult1.restStatus() == RestStatus.OK) + val failures = extractFailuresFromSearchResponse(searchResult1) + assertNotNull(failures) + assertEquals(1, failures?.size) + assertEquals( + "Searching multiple indices where one is rollup and other is not, didn't return failure", + "illegal_argument_exception", failures?.get(0)?.get("type") ?: "Didn't find failure type in search response" + + ) + assertEquals( + "Searching multiple indices where one is rollup and other is not, didn't return failure", + "Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response" + ) + + // Search 2 rollups with different mappings + try { + client().makeRequest( + "POST", + "/$targetIndex1,$targetIndex2/_search", + emptyMap(), + StringEntity(req, ContentType.APPLICATION_JSON) + ) + } catch (e: ResponseException) { + assertEquals( + "Searching multiple rollup indices which weren't created by same rollup job, didn't return failure", + "Could not find a rollup job that can answer this query because [missing field RatecodeID, missing field passenger_count]", + (e.response.asMap() as Map>>)["error"]!!["caused_by"]!!["reason"] + ) + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } }