From e7c254d5eef20305fd57e4ae86c619db5ce57e7b Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 30 Nov 2022 19:53:59 +0100 Subject: [PATCH 01/21] IT rollup fix Signed-off-by: Petar Dzepina --- .../rollup/runner/RollupRunnerIT.kt | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index b57e9e5d2..c1fb4639a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -204,7 +204,7 @@ class RollupRunnerIT : RollupRestTestCase() { sourceIndex = indexName, targetIndex = "${indexName}_target", metadataID = null, - continuous = true + continuous = false ) // Create source index @@ -222,24 +222,30 @@ class RollupRunnerIT : RollupRestTestCase() { updateRollupStartTime(rollup) var previousRollupMetadata: RollupMetadata? = null - waitFor { + rollup = waitFor { val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job not found", rollupJob) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) previousRollupMetadata = getRollupMetadata(rollupJob.metadataID!!) assertNotNull("Rollup metadata not found", previousRollupMetadata) - assertEquals("Unexpected metadata status", RollupMetadata.Status.INIT, previousRollupMetadata!!.status) + assertEquals("Unexpected metadata status", RollupMetadata.Status.FINISHED, previousRollupMetadata!!.status) + rollupJob } - // Delete rollup metadata assertNotNull("Previous rollup metadata was not saved", previousRollupMetadata) deleteRollupMetadata(previousRollupMetadata!!.id) - // Update rollup start time to run second execution + // Enable rollup and Update start time to run second execution + client().makeRequest( + "PUT", + "$ROLLUP_JOBS_BASE_URI/${rollup.id}?if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}", + emptyMap(), rollup.copy(enabled = true).toHttpEntity() + ) + updateRollupStartTime(rollup) - waitFor() { + waitFor { val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job not found", rollupJob) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) From 6f60111aa37d27c0ad4a0768e10c4c8d16ab779c Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Thu, 1 Dec 2022 02:57:23 +0100 Subject: [PATCH 02/21] ITs fix Signed-off-by: Petar Dzepina --- .../rollup/RollupRestTestCase.kt | 43 +++++++++++++++++-- .../rollup/runner/RollupRunnerIT.kt | 3 +- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 8794db492..2025798de 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -53,7 +53,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { @After @Suppress("UNCHECKED_CAST") - fun killAllRunningTasks() { + fun KillAllCancallableRunningTasks() { client().makeRequest("POST", "_tasks/_cancel?actions=*") waitFor { val response = client().makeRequest("GET", "_tasks") @@ -71,11 +71,40 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + fun waitForCancallableTasksToFinish() { + waitFor { + val response = client().makeRequest("GET", "_tasks") + val nodes = response.asMap()["nodes"] as Map + var hasCancallableRunningTasks = false + nodes.forEach { + val tasks = (it.value as Map)["tasks"] as Map + tasks.forEach { e -> + if ((e.value as Map)["cancellable"] as Boolean) { + hasCancallableRunningTasks = true + logger.info("cancellable task running: ${e.key}") + } + } + } + assertFalse(hasCancallableRunningTasks) + } + } + @Before fun setDebugLogLevel() { client().makeRequest( "PUT", "_cluster/settings", - StringEntity("""{"transient":{"logger.org.opensearch.indexmanagement.rollup":"DEBUG"}}""", APPLICATION_JSON) + StringEntity( + """ + { + "transient": { + "logger.org.opensearch.indexmanagement.rollup":"DEBUG", + "logger.org.opensearch.jobscheduler":"DEBUG" + } + } + """.trimIndent(), + APPLICATION_JSON + ) ) } @@ -254,10 +283,16 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { } val intervalSchedule = (update.jobSchedule as IntervalSchedule) val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() - val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis + val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis) val waitForActiveShards = if (isMultiNode) "all" else "1" + // TODO flaky: Add this log to confirm this update is missed by job scheduler + // This miss is because shard remove, job scheduler deschedule on the original node and reschedule on another node + // However the shard comes back, and job scheduler deschedule on the another node and reschedule on the original node + // During this period, this update got missed + // Since from the log, this happens very fast (within 0.1~0.2s), the above cluster explain may not have the granularity to catch this. + logger.info("Update rollup start time to $startTimeMillis") val response = client().makeRequest( - "POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", + "POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards&refresh=true", StringEntity( "{\"doc\":{\"rollup\":{\"schedule\":{\"interval\":{\"start_time\":" + "\"$startTimeMillis\"}}}}}", diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index c1fb4639a..30eb73d80 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -226,6 +226,7 @@ class RollupRunnerIT : RollupRestTestCase() { val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job not found", rollupJob) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + assertFalse("Rollup job is still enabled", rollupJob.enabled) previousRollupMetadata = getRollupMetadata(rollupJob.metadataID!!) assertNotNull("Rollup metadata not found", previousRollupMetadata) @@ -240,7 +241,7 @@ class RollupRunnerIT : RollupRestTestCase() { client().makeRequest( "PUT", "$ROLLUP_JOBS_BASE_URI/${rollup.id}?if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}", - emptyMap(), rollup.copy(enabled = true).toHttpEntity() + emptyMap(), rollup.copy(enabled = true, jobEnabledTime = Instant.now()).toHttpEntity() ) updateRollupStartTime(rollup) From 3f1f2845de5317c379856f1d90372249ade4f0e6 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 2 Dec 2022 03:05:17 +0100 Subject: [PATCH 03/21] added more ITs Signed-off-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptor.kt | 10 +- .../rollup/interceptor/RollupInterceptorIT.kt | 146 ++++++++++++++++++ 2 files changed, 153 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index cac99a8c2..8b56c464f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -214,9 +214,13 @@ class RollupInterceptor( fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type)) } is QueryStringQueryBuilder -> { - rewriteQueryStringQueryBuilder(query, concreteSourceIndexName) { - fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, it.toString(), Dimension.Type.TERMS.type)) - it + try { + rewriteQueryStringQueryBuilder(query, concreteSourceIndexName) { + fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, it.toString(), Dimension.Type.TERMS.type)) + it + } + } catch (e: Exception) { + throw IllegalArgumentException("The ${query.name} query is invalid") } } else -> { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 6fe618d11..7232a5b66 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1155,4 +1155,150 @@ class RollupInterceptorIT : RollupRestTestCase() { rollupAggRes.getValue("min_passenger_count")["value"] ) } + + fun `test roll up search query_string query invalid query`() { + val sourceIndex = "source_rollup_search_qsq" + val targetIndex = "target_rollup_qsq_search" + generateNYCTaxiData(sourceIndex) + val rollup = Rollup( + id = "basic_query_string_query_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex, + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "invalid query" + } + }, + "aggs": { + "min_passenger_count": { + "sum": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + fail("search should've failed due to incorrect query") + } catch (e: ResponseException) { + assertTrue("The query_string query wasn't invalid", e.message!!.contains("The query_string query is invalid")) + } + } + + fun `test roll up search query_string query unknown field`() { + val sourceIndex = "source_rollup_search_qsq" + val targetIndex = "target_rollup_qsq_search" + generateNYCTaxiData(sourceIndex) + val rollup = Rollup( + id = "basic_query_string_query_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex, + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "RatecodeID:>=1 AND unknown_field:<=10" + } + }, + "aggs": { + "min_passenger_count": { + "sum": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + fail("search should've failed due to incorrect query") + } catch (e: ResponseException) { + assertTrue("The query_string query wasn't invalid", e.message!!.contains("The query_string query is invalid")) + } + } } From cee3be97bc787fb2ab2762d4fcd35f239c950857 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 2 Dec 2022 03:16:00 +0100 Subject: [PATCH 04/21] added ktlint supress for TooManyThrows Signed-off-by: Petar Dzepina --- .../indexmanagement/rollup/interceptor/RollupInterceptor.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 8b56c464f..a803c13b4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -163,7 +163,7 @@ class RollupInterceptor( return fieldMappings } - @Suppress("ComplexMethod") + @Suppress("ComplexMethod","ThrowsCount") private fun getQueryMetadata( query: QueryBuilder?, concreteSourceIndexName: String, From 4c631f2fe7b187ab27f75de98251bfb3f5c554fe Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 2 Dec 2022 03:17:31 +0100 Subject: [PATCH 05/21] ktlint fix Signed-off-by: Petar Dzepina --- .../indexmanagement/rollup/interceptor/RollupInterceptor.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index a803c13b4..be584135f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -163,7 +163,7 @@ class RollupInterceptor( return fieldMappings } - @Suppress("ComplexMethod","ThrowsCount") + @Suppress("ComplexMethod", "ThrowsCount") private fun getQueryMetadata( query: QueryBuilder?, concreteSourceIndexName: String, From 20727aa1de7fe1456ff109a9b04ff21a7cbbf3fb Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 2 Dec 2022 11:00:50 +0100 Subject: [PATCH 06/21] fixed ITs Signed-off-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptorIT.kt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 7232a5b66..533706918 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1079,8 +1079,8 @@ class RollupInterceptorIT : RollupRestTestCase() { } fun `test roll up search query_string query`() { - val sourceIndex = "source_rollup_search_qsq" - val targetIndex = "target_rollup_qsq_search" + val sourceIndex = "source_rollup_search_qsq_1" + val targetIndex = "target_rollup_qsq_search_1" generateNYCTaxiData(sourceIndex) val rollup = Rollup( id = "basic_query_string_query_rollup_search", @@ -1157,8 +1157,8 @@ class RollupInterceptorIT : RollupRestTestCase() { } fun `test roll up search query_string query invalid query`() { - val sourceIndex = "source_rollup_search_qsq" - val targetIndex = "target_rollup_qsq_search" + val sourceIndex = "source_rollup_search_qsq_2" + val targetIndex = "target_rollup_qsq_search_2" generateNYCTaxiData(sourceIndex) val rollup = Rollup( id = "basic_query_string_query_rollup_search", @@ -1230,11 +1230,11 @@ class RollupInterceptorIT : RollupRestTestCase() { } fun `test roll up search query_string query unknown field`() { - val sourceIndex = "source_rollup_search_qsq" - val targetIndex = "target_rollup_qsq_search" + val sourceIndex = "source_rollup_search_qsq_3" + val targetIndex = "target_rollup_qsq_search_3" generateNYCTaxiData(sourceIndex) val rollup = Rollup( - id = "basic_query_string_query_rollup_search", + id = "basic_query_string_query_rollup_search_3", enabled = true, schemaVersion = 1L, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), From bccda3c71c3c9395e8bec1ae55767102933d9d65 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 2 Dec 2022 20:38:51 +0100 Subject: [PATCH 07/21] detekt fixes Signed-off-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptor.kt | 17 +++++++---------- .../indexmanagement/rollup/util/RollupUtils.kt | 7 ++++++- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index be584135f..86a6a179c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -207,27 +207,24 @@ class RollupInterceptor( query.zeroTermsQuery() != MatchQuery.DEFAULT_ZERO_TERMS_QUERY ) { throw IllegalArgumentException( - "The ${query.name} query is currently not supported with analyzer/slop/zero_terms_query in " + - "rollups" + "The ${query.name} query is currently not supported with analyzer/slop/zero_terms_query in rollups" ) } fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type)) } is QueryStringQueryBuilder -> { - try { - rewriteQueryStringQueryBuilder(query, concreteSourceIndexName) { - fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, it.toString(), Dimension.Type.TERMS.type)) - it - } - } catch (e: Exception) { - throw IllegalArgumentException("The ${query.name} query is invalid") + // Throws IllegalArgumentException if unable to parse query + rewriteQueryStringQueryBuilder(query, concreteSourceIndexName) { + fieldMappings.add( + RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, it.toString(), Dimension.Type.TERMS.type) + ) + it } } else -> { throw IllegalArgumentException("The ${query.name} query is currently not supported in rollups") } } - return fieldMappings } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index 827e46131..b2cb0cb50 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -406,7 +406,12 @@ fun rewriteQueryStringQueryBuilder( return super.getRangeQuery(fieldRewriteFn(field), part1, part2, startInclusive, endInclusive) } } - val newLuceneQuery = parser.parse(luceneQuery.toString()) + var newLuceneQuery: Query? = null + try { + newLuceneQuery = parser.parse(luceneQuery.toString()) + } catch (e: Exception) { + throw IllegalArgumentException("The ${queryBuilder.name} query is invalid") + } return QueryStringQueryBuilder(newLuceneQuery.toString()) } From 9ca3de5e94085d10acfea4e9bf1575a312518270 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 2 Dec 2022 20:40:46 +0100 Subject: [PATCH 08/21] warning fix Signed-off-by: Petar Dzepina --- .../org/opensearch/indexmanagement/rollup/util/RollupUtils.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index b2cb0cb50..5c0193d9d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -406,7 +406,7 @@ fun rewriteQueryStringQueryBuilder( return super.getRangeQuery(fieldRewriteFn(field), part1, part2, startInclusive, endInclusive) } } - var newLuceneQuery: Query? = null + var newLuceneQuery: Query? try { newLuceneQuery = parser.parse(luceneQuery.toString()) } catch (e: Exception) { From 68a248697f333c4b6cf28094955db8f0a81fc285 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 2 Dec 2022 22:28:24 +0100 Subject: [PATCH 09/21] IT fix Signed-off-by: Petar Dzepina --- .../indexmanagement/rollup/interceptor/RollupInterceptorIT.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 533706918..286eb3241 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1161,7 +1161,7 @@ class RollupInterceptorIT : RollupRestTestCase() { val targetIndex = "target_rollup_qsq_search_2" generateNYCTaxiData(sourceIndex) val rollup = Rollup( - id = "basic_query_string_query_rollup_search", + id = "basic_query_string_query_rollup_search_2", enabled = true, schemaVersion = 1L, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), From 1a33879fc0ab543e3f8d224c05bf6624a600d50c Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 7 Dec 2022 22:40:36 +0100 Subject: [PATCH 10/21] removed passing module and plugin dir paths when creating QueryShardContext Signed-off-by: Petar Dzepina --- .../indexmanagement/rollup/util/QueryShardContextFactory.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt index 2050459cd..536a94b0b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt @@ -75,7 +75,7 @@ object QueryShardContextFactory { .put(Environment.PATH_HOME_SETTING.key, environment.tmpFile()) .build() val pluginsService = - PluginsService(nodeSettings, null, environment.modulesFile(), environment.pluginsFile(), listOf()) + PluginsService(nodeSettings, null, null, null, listOf()) val additionalSettings = pluginsService.pluginSettings val settingsModule = SettingsModule( nodeSettings, From 7e9491b6e6890303471f7fa6c3d3c751a4ef2a33 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 7 Dec 2022 23:40:55 +0100 Subject: [PATCH 11/21] empty commit Signed-off-by: Petar Dzepina From 5c79eb81afa0925904daf3ac0124880825b085ca Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Thu, 8 Dec 2022 01:20:38 +0100 Subject: [PATCH 12/21] empty commit Signed-off-by: Petar Dzepina From 476d15f3a5a8ff117b34e2e8b20c24da35fc5479 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Mon, 2 Jan 2023 18:24:39 +0100 Subject: [PATCH 13/21] refactored qsq rewrite Signed-off-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptor.kt | 10 +- .../rollup/query/QueryStringQueryParserExt.kt | 48 ++++++ .../rollup/query/QueryStringQueryUtil.kt | 157 ++++++++++++++++++ .../rollup/util/RollupUtils.kt | 45 +---- .../rollup/RollupRestTestCase.kt | 70 ++++++++ .../rollup/interceptor/RollupInterceptorIT.kt | 143 ++++++++++++++-- 6 files changed, 414 insertions(+), 59 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 86a6a179c..0ed1a7298 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -24,12 +24,12 @@ import org.opensearch.indexmanagement.common.model.dimension.Dimension import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping.Companion.UNKNOWN_MAPPING +import org.opensearch.indexmanagement.rollup.query.QueryStringQueryUtil import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.rollup.util.getDateHistogram import org.opensearch.indexmanagement.rollup.util.getRollupJobs import org.opensearch.indexmanagement.rollup.util.isRollupIndex import org.opensearch.indexmanagement.rollup.util.populateFieldMappings -import org.opensearch.indexmanagement.rollup.util.rewriteQueryStringQueryBuilder import org.opensearch.indexmanagement.rollup.util.rewriteSearchSourceBuilder import org.opensearch.search.aggregations.AggregationBuilder import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder @@ -214,11 +214,9 @@ class RollupInterceptor( } is QueryStringQueryBuilder -> { // Throws IllegalArgumentException if unable to parse query - rewriteQueryStringQueryBuilder(query, concreteSourceIndexName) { - fieldMappings.add( - RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, it.toString(), Dimension.Type.TERMS.type) - ) - it + val fields = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName) + for (field in fields) { + fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, field, Dimension.Type.TERMS.type)) } } else -> { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt new file mode 100644 index 000000000..8240a0a07 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.query + +import org.apache.lucene.search.Query +import org.opensearch.index.query.QueryShardContext +import org.opensearch.index.search.QueryStringQueryParser + +class QueryStringQueryParserExt : QueryStringQueryParser { + + val discoveredFields = mutableListOf() + + constructor(context: QueryShardContext?, lenient: Boolean) : super(context, lenient) + constructor(context: QueryShardContext?, defaultField: String, lenient: Boolean) : super(context, defaultField, lenient) + constructor(context: QueryShardContext, resolvedFields: Map, lenient: Boolean) : super(context, resolvedFields, lenient) + + override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? { + if (field != null) discoveredFields.add(field) + return super.getFuzzyQuery(field, termStr, minSimilarity) + } + override fun getPrefixQuery(field: String?, termStr: String?): Query { + if (field != null) discoveredFields.add(field) + return super.getPrefixQuery(field, termStr) + } + override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query { + if (field != null) discoveredFields.add(field) + return super.getFieldQuery(field, queryText, quoted) + } + override fun getWildcardQuery(field: String?, termStr: String?): Query { + if (field != null) discoveredFields.add(field) + return super.getWildcardQuery(field, termStr) + } + override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query { + if (field != null) discoveredFields.add(field) + return super.getFieldQuery(field, queryText, slop) + } + override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query { + if (field != null) discoveredFields.add(field) + return super.getRangeQuery(field, part1, part2, startInclusive, endInclusive) + } + override fun getRegexpQuery(field: String?, termStr: String?): Query { + if (field != null) discoveredFields.add(field) + return super.getRegexpQuery(field, termStr) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt new file mode 100644 index 000000000..1057a565e --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt @@ -0,0 +1,157 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.query + +import org.apache.lucene.queryparser.classic.ParseException +import org.apache.lucene.queryparser.classic.QueryParser +import org.opensearch.common.regex.Regex +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.index.analysis.NamedAnalyzer +import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.QueryShardException +import org.opensearch.index.query.QueryStringQueryBuilder +import org.opensearch.index.query.support.QueryParsers +import org.opensearch.index.search.QueryParserHelper +import org.opensearch.indexmanagement.common.model.dimension.Dimension +import org.opensearch.indexmanagement.rollup.util.QueryShardContextFactory + +object QueryStringQueryUtil { + + fun rewriteQueryStringQuery( + queryBuilder: QueryBuilder, + concreteIndexName: String + ): QueryStringQueryBuilder { + val qsqBuilder = queryBuilder as QueryStringQueryBuilder + // Parse query_string query and extract all discovered fields + val fieldsFromQueryString = extractFieldsFromQueryString(queryBuilder, concreteIndexName) + // Rewrite query_string + var newQueryString = qsqBuilder.queryString() + fieldsFromQueryString.forEach { field -> + newQueryString = newQueryString.replace("$field:", "$field.${Dimension.Type.TERMS.type}:") + } + // Rewrite fields + var newDefaultField: String? = null + if (qsqBuilder.defaultField() != null) { + newDefaultField = "${qsqBuilder.defaultField()}.${Dimension.Type.TERMS.type}" + } + var newFields: MutableMap? = null + if (qsqBuilder.fields() != null && qsqBuilder.fields().size > 0) { + newFields = mutableMapOf() + qsqBuilder.fields().forEach { + newFields.put("${it.key}.${Dimension.Type.TERMS.type}", it.value) + } + } + var retVal = QueryStringQueryBuilder(newQueryString) + .defaultField(newDefaultField) + .rewrite(qsqBuilder.rewrite()) + .fuzzyRewrite(qsqBuilder.fuzzyRewrite()) + .autoGenerateSynonymsPhraseQuery(qsqBuilder.autoGenerateSynonymsPhraseQuery()) + .allowLeadingWildcard(qsqBuilder.allowLeadingWildcard()) + .analyzeWildcard(qsqBuilder.analyzeWildcard()) + .defaultOperator(qsqBuilder.defaultOperator()) + .escape(qsqBuilder.escape()) + .fuzziness(qsqBuilder.fuzziness()) + .lenient(qsqBuilder.lenient()) + .enablePositionIncrements(qsqBuilder.enablePositionIncrements()) + .fuzzyMaxExpansions(qsqBuilder.fuzzyMaxExpansions()) + .fuzzyPrefixLength(qsqBuilder.fuzzyPrefixLength()) + .queryName(qsqBuilder.queryName()) + .quoteAnalyzer(qsqBuilder.quoteAnalyzer()) + .analyzer(qsqBuilder.analyzer()) + .minimumShouldMatch(qsqBuilder.minimumShouldMatch()) + .timeZone(qsqBuilder.timeZone()) + .phraseSlop(qsqBuilder.phraseSlop()) + .quoteFieldSuffix(qsqBuilder.quoteFieldSuffix()) + .boost(qsqBuilder.boost()) + .fuzzyTranspositions(qsqBuilder.fuzzyTranspositions()) + + if (newFields != null && newFields.size > 0) { + retVal = retVal.fields(newFields) + } + if (qsqBuilder.tieBreaker() != null) { + retVal = retVal.tieBreaker(qsqBuilder.tieBreaker()) + } + return retVal + } + + fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): List { + val context = QueryShardContextFactory.createShardContext(concreteIndexName) + val qsqBuilder = queryBuilder as QueryStringQueryBuilder + val rewrittenQueryString = if (qsqBuilder.escape()) QueryParser.escape(qsqBuilder.queryString()) else qsqBuilder.queryString() + val queryParser: QueryStringQueryParserExt + val isLenient: Boolean = if (qsqBuilder.lenient() == null) context.queryStringLenient() else qsqBuilder.lenient() + if (qsqBuilder.defaultField() != null) { + if (Regex.isMatchAllPattern(qsqBuilder.defaultField())) { + queryParser = QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) + } else { + queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient) + } + } else if (qsqBuilder.fields().size > 0) { + val resolvedFields = QueryParserHelper.resolveMappingFields(context, qsqBuilder.fields()) + queryParser = if (QueryParserHelper.hasAllFieldsWildcard(qsqBuilder.fields().keys)) { + QueryStringQueryParserExt(context, resolvedFields, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) + } else { + QueryStringQueryParserExt(context, resolvedFields, isLenient) + } + } else { + val defaultFields: List = context.defaultFields() + queryParser = if (QueryParserHelper.hasAllFieldsWildcard(defaultFields)) { + QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) + } else { + val resolvedFields = QueryParserHelper.resolveMappingFields( + context, + QueryParserHelper.parseFieldsAndWeights(defaultFields) + ) + QueryStringQueryParserExt(context, resolvedFields, isLenient) + } + } + + if (qsqBuilder.analyzer() != null) { + val namedAnalyzer: NamedAnalyzer = context.getIndexAnalyzers().get(qsqBuilder.analyzer()) + ?: throw QueryShardException(context, "[query_string] analyzer [$qsqBuilder.analyzer] not found") + queryParser.setForceAnalyzer(namedAnalyzer) + } + + if (qsqBuilder.quoteAnalyzer() != null) { + val forceQuoteAnalyzer: NamedAnalyzer = context.getIndexAnalyzers().get(qsqBuilder.quoteAnalyzer()) + ?: throw QueryShardException(context, "[query_string] quote_analyzer [$qsqBuilder.quoteAnalyzer] not found") + queryParser.setForceQuoteAnalyzer(forceQuoteAnalyzer) + } + + queryParser.defaultOperator = qsqBuilder.defaultOperator().toQueryParserOperator() + // TODO can we extract this somehow? There's no getter for this + queryParser.setType(QueryStringQueryBuilder.DEFAULT_TYPE) + if (qsqBuilder.tieBreaker() != null) { + queryParser.setGroupTieBreaker(qsqBuilder.tieBreaker()) + } else { + queryParser.setGroupTieBreaker(QueryStringQueryBuilder.DEFAULT_TYPE.tieBreaker()) + } + queryParser.phraseSlop = qsqBuilder.phraseSlop() + queryParser.setQuoteFieldSuffix(qsqBuilder.quoteFieldSuffix()) + queryParser.setAllowLeadingWildcard( + if (qsqBuilder.allowLeadingWildcard() == null) context.queryStringAllowLeadingWildcard() else qsqBuilder.allowLeadingWildcard() + ) + queryParser.setAnalyzeWildcard(if (qsqBuilder.analyzeWildcard() == null) context.queryStringAnalyzeWildcard() else qsqBuilder.analyzeWildcard()) + queryParser.enablePositionIncrements = qsqBuilder.enablePositionIncrements() + queryParser.setFuzziness(qsqBuilder.fuzziness()) + queryParser.fuzzyPrefixLength = qsqBuilder.fuzzyPrefixLength() + queryParser.setFuzzyMaxExpansions(qsqBuilder.fuzzyMaxExpansions()) + queryParser.setFuzzyRewriteMethod(QueryParsers.parseRewriteMethod(qsqBuilder.fuzzyRewrite(), LoggingDeprecationHandler.INSTANCE)) + queryParser.multiTermRewriteMethod = QueryParsers.parseRewriteMethod(qsqBuilder.rewrite(), LoggingDeprecationHandler.INSTANCE) + queryParser.setTimeZone(qsqBuilder.timeZone()) + queryParser.determinizeWorkLimit = qsqBuilder.maxDeterminizedStates() + queryParser.autoGenerateMultiTermSynonymsPhraseQuery = qsqBuilder.autoGenerateSynonymsPhraseQuery() + queryParser.setFuzzyTranspositions(qsqBuilder.fuzzyTranspositions()) + + try { + queryParser.parse(rewrittenQueryString) + } catch (e: ParseException) { + throw QueryShardException(context, "Failed to parse query [" + qsqBuilder.queryString() + "]", e) + } + // Return discovered fields + return queryParser.discoveredFields + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index 5c0193d9d..e181d32af 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -7,9 +7,6 @@ package org.opensearch.indexmanagement.rollup.util -import org.apache.lucene.analysis.standard.StandardAnalyzer -import org.apache.lucene.queryparser.classic.QueryParser -import org.apache.lucene.search.Query import org.opensearch.action.get.GetResponse import org.opensearch.action.search.SearchRequest import org.opensearch.cluster.ClusterState @@ -45,6 +42,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.Max import org.opensearch.indexmanagement.rollup.model.metric.Min import org.opensearch.indexmanagement.rollup.model.metric.Sum import org.opensearch.indexmanagement.rollup.model.metric.ValueCount +import org.opensearch.indexmanagement.rollup.query.QueryStringQueryUtil import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.util.IndexUtils @@ -373,51 +371,16 @@ fun Rollup.rewriteQueryBuilder( newMatchPhraseQueryBuilder.boost(queryBuilder.boost()) } is QueryStringQueryBuilder -> { - rewriteQueryStringQueryBuilder(queryBuilder, concreteIndexName) { it + "." + Dimension.Type.TERMS.type } + QueryStringQueryUtil.rewriteQueryStringQuery(queryBuilder, concreteIndexName) } // We do nothing otherwise, the validation logic should have already verified so not throwing an exception else -> queryBuilder } } -fun rewriteQueryStringQueryBuilder( - queryBuilder: QueryBuilder, - concreteIndexName: String, - fieldRewriteFn: (String?) -> String? -): QueryStringQueryBuilder { - val luceneQuery = queryBuilder.toQuery(QueryShardContextFactory.createShardContext(concreteIndexName)) - var parser = object : QueryParser(null, StandardAnalyzer()) { - override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? { - return super.getFuzzyQuery(fieldRewriteFn(field), termStr, minSimilarity) - } - override fun getPrefixQuery(field: String?, termStr: String?): Query { - return super.getPrefixQuery(fieldRewriteFn(field), termStr) - } - override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query { - return super.getFieldQuery(fieldRewriteFn(field), queryText, quoted) - } - override fun getWildcardQuery(field: String?, termStr: String?): Query { - return super.getWildcardQuery(fieldRewriteFn(field), termStr) - } - override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query { - return super.getFieldQuery(fieldRewriteFn(field), queryText, slop) - } - override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query { - return super.getRangeQuery(fieldRewriteFn(field), part1, part2, startInclusive, endInclusive) - } - } - var newLuceneQuery: Query? - try { - newLuceneQuery = parser.parse(luceneQuery.toString()) - } catch (e: Exception) { - throw IllegalArgumentException("The ${queryBuilder.name} query is invalid") - } - return QueryStringQueryBuilder(newLuceneQuery.toString()) -} - -fun Set.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder, concreteIndexName: String = ""): QueryBuilder { +fun Set.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder, targetIndexName: String = ""): QueryBuilder { val wrappedQueryBuilder = BoolQueryBuilder() - wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap, concreteIndexName)) + wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap, targetIndexName)) wrappedQueryBuilder.should(TermsQueryBuilder("rollup._id", this.map { it.id })) wrappedQueryBuilder.minimumShouldMatch(1) return wrappedQueryBuilder diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 2025798de..9f19f2a9f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -7,12 +7,14 @@ package org.opensearch.indexmanagement.rollup import org.apache.http.HttpEntity import org.apache.http.HttpHeaders +import org.apache.http.HttpStatus import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader import org.junit.After import org.junit.AfterClass import org.junit.Before +import org.opensearch.client.Request import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient @@ -318,4 +320,72 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { ) assertEquals("Request failed", RestStatus.OK, res.restStatus()) } + + protected fun createSampleIndexForQSQTest(index: String) { + val mapping = """ + "properties": { + "event_ts": { + "type": "date" + }, + "state": { + "type": "keyword" + }, + "state_ext": { + "type": "keyword" + }, + "state_ordinal": { + "type": "long" + }, + "earnings": { + "type": "long" + } + + } + """.trimIndent() + createIndex(index, Settings.EMPTY, mapping) + + for (i in 1..5) { + val doc = """ + { + "event_ts": "2019-01-01T12:10:30Z", + "state": "TX", + "state_ext": "CA", + "state_ordinal": ${random().nextInt(0, 10)}, + "earnings": $i + } + """.trimIndent() + indexDoc(index, "$i", doc) + } + for (i in 6..8) { + val doc = """ + { + "event_ts": "2019-01-01T12:10:30Z", + "state": "TA", + "state_ext": "SE", + "state_ordinal": ${random().nextInt(0, 10)}, + "earnings": $i + } + """.trimIndent() + indexDoc(index, "$i", doc) + } + for (i in 9..11) { + val doc = """ + { + "event_ts": "2019-01-02T12:10:30Z", + "state": "CA", + "state_ext": "MA", + "state_ordinal": ${random().nextInt(0, 10)}, + "earnings": $i + } + """.trimIndent() + indexDoc(index, "$i", doc) + } + } + + protected fun indexDoc(index: String, id: String, doc: String) { + val request = Request("POST", "$index/_doc/$id/?refresh=true") + request.setJsonEntity(doc) + val resp = client().performRequest(request) + assertEquals(HttpStatus.SC_CREATED, resp.restStatus().status) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 286eb3241..0682091c4 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1081,7 +1081,9 @@ class RollupInterceptorIT : RollupRestTestCase() { fun `test roll up search query_string query`() { val sourceIndex = "source_rollup_search_qsq_1" val targetIndex = "target_rollup_qsq_search_1" - generateNYCTaxiData(sourceIndex) + + createSampleIndexForQSQTest(sourceIndex) + val rollup = Rollup( id = "basic_query_string_query_rollup_search", enabled = true, @@ -1098,19 +1100,19 @@ class RollupInterceptorIT : RollupRestTestCase() { delay = 0, continuous = false, dimensions = listOf( - DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), - Terms("RatecodeID", "RatecodeID"), - Terms("PULocationID", "PULocationID") + DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), + Terms("state", "state"), + Terms("state_ext", "state_ext"), + Terms("state_ordinal", "state_ordinal"), ), metrics = listOf( RollupMetrics( - sourceField = "passenger_count", targetField = "passenger_count", + sourceField = "earnings", targetField = "earnings", metrics = listOf( Sum(), Min(), Max(), ValueCount(), Average() ) - ), - RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) ) ).let { createRollup(it, it.id) } @@ -1131,13 +1133,13 @@ class RollupInterceptorIT : RollupRestTestCase() { "size": 0, "query": { "query_string": { - "query": "RatecodeID:>=1 AND RatecodeID:<=10" + "query": "state:TX AND state_ext:CA" } }, "aggs": { - "min_passenger_count": { + "earnings_total": { "sum": { - "field": "passenger_count" + "field": "earnings" } } } @@ -1151,8 +1153,125 @@ class RollupInterceptorIT : RollupRestTestCase() { var rollupAggRes = rollupRes.asMap()["aggregations"] as Map> assertEquals( "Source and rollup index did not return same min results", - rawAggRes.getValue("min_passenger_count")["value"], - rollupAggRes.getValue("min_passenger_count")["value"] + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // Fuzzy query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX~2" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + // Prefix query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:T*" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + // Regex query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:/[A-Z]T/" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + // Range query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state_ordinal:[0 TO 10]" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] ) } From 05400cac3e4818a2f5c7e84e27d3164e5afb78b6 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 3 Jan 2023 19:56:25 +0100 Subject: [PATCH 14/21] added proper checking for fields array or default_fields Signed-off-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptor.kt | 7 +- .../rollup/query/QueryStringQueryParserExt.kt | 14 +- .../rollup/query/QueryStringQueryUtil.kt | 60 ++++++-- .../rollup/RollupRestTestCase.kt | 12 ++ .../rollup/interceptor/RollupInterceptorIT.kt | 140 +++++++++++++++++- 5 files changed, 209 insertions(+), 24 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 0ed1a7298..d3ac820a7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -214,8 +214,11 @@ class RollupInterceptor( } is QueryStringQueryBuilder -> { // Throws IllegalArgumentException if unable to parse query - val fields = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName) - for (field in fields) { + val (queryFields, otherFields) = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName) + for (field in queryFields) { + fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, field, Dimension.Type.TERMS.type)) + } + for (field in otherFields.keys) { fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, field, Dimension.Type.TERMS.type)) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt index 8240a0a07..4f93d187c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt @@ -18,31 +18,31 @@ class QueryStringQueryParserExt : QueryStringQueryParser { constructor(context: QueryShardContext, resolvedFields: Map, lenient: Boolean) : super(context, resolvedFields, lenient) override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? { - if (field != null) discoveredFields.add(field) + if (field != null && field != "*") discoveredFields.add(field) return super.getFuzzyQuery(field, termStr, minSimilarity) } override fun getPrefixQuery(field: String?, termStr: String?): Query { - if (field != null) discoveredFields.add(field) + if (field != null && field != "*") discoveredFields.add(field) return super.getPrefixQuery(field, termStr) } override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query { - if (field != null) discoveredFields.add(field) + if (field != null && field != "*") discoveredFields.add(field) return super.getFieldQuery(field, queryText, quoted) } override fun getWildcardQuery(field: String?, termStr: String?): Query { - if (field != null) discoveredFields.add(field) + if (field != null && field != "*") discoveredFields.add(field) return super.getWildcardQuery(field, termStr) } override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query { - if (field != null) discoveredFields.add(field) + if (field != null && field != "*") discoveredFields.add(field) return super.getFieldQuery(field, queryText, slop) } override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query { - if (field != null) discoveredFields.add(field) + if (field != null && field != "*") discoveredFields.add(field) return super.getRangeQuery(field, part1, part2, startInclusive, endInclusive) } override fun getRegexpQuery(field: String?, termStr: String?): Query { - if (field != null) discoveredFields.add(field) + if (field != null && field != "*") discoveredFields.add(field) return super.getRegexpQuery(field, termStr) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt index 1057a565e..3fb82c487 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt @@ -7,10 +7,12 @@ package org.opensearch.indexmanagement.rollup.query import org.apache.lucene.queryparser.classic.ParseException import org.apache.lucene.queryparser.classic.QueryParser +import org.opensearch.OpenSearchParseException import org.opensearch.common.regex.Regex import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.index.analysis.NamedAnalyzer import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.QueryShardContext import org.opensearch.index.query.QueryShardException import org.opensearch.index.query.QueryStringQueryBuilder import org.opensearch.index.query.support.QueryParsers @@ -26,26 +28,20 @@ object QueryStringQueryUtil { ): QueryStringQueryBuilder { val qsqBuilder = queryBuilder as QueryStringQueryBuilder // Parse query_string query and extract all discovered fields - val fieldsFromQueryString = extractFieldsFromQueryString(queryBuilder, concreteIndexName) + val (fieldsFromQueryString, otherFields) = extractFieldsFromQueryString(queryBuilder, concreteIndexName) // Rewrite query_string var newQueryString = qsqBuilder.queryString() fieldsFromQueryString.forEach { field -> newQueryString = newQueryString.replace("$field:", "$field.${Dimension.Type.TERMS.type}:") } - // Rewrite fields - var newDefaultField: String? = null - if (qsqBuilder.defaultField() != null) { - newDefaultField = "${qsqBuilder.defaultField()}.${Dimension.Type.TERMS.type}" - } var newFields: MutableMap? = null - if (qsqBuilder.fields() != null && qsqBuilder.fields().size > 0) { + if (otherFields.isNotEmpty()) { newFields = mutableMapOf() - qsqBuilder.fields().forEach { + otherFields.forEach { newFields.put("${it.key}.${Dimension.Type.TERMS.type}", it.value) } } var retVal = QueryStringQueryBuilder(newQueryString) - .defaultField(newDefaultField) .rewrite(qsqBuilder.rewrite()) .fuzzyRewrite(qsqBuilder.fuzzyRewrite()) .autoGenerateSynonymsPhraseQuery(qsqBuilder.autoGenerateSynonymsPhraseQuery()) @@ -77,20 +73,23 @@ object QueryStringQueryUtil { return retVal } - fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): List { + fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): Pair, Map> { val context = QueryShardContextFactory.createShardContext(concreteIndexName) val qsqBuilder = queryBuilder as QueryStringQueryBuilder val rewrittenQueryString = if (qsqBuilder.escape()) QueryParser.escape(qsqBuilder.queryString()) else qsqBuilder.queryString() val queryParser: QueryStringQueryParserExt val isLenient: Boolean = if (qsqBuilder.lenient() == null) context.queryStringLenient() else qsqBuilder.lenient() + var otherFields = mapOf() if (qsqBuilder.defaultField() != null) { if (Regex.isMatchAllPattern(qsqBuilder.defaultField())) { + otherFields = resolveMatchAllPatternFields(context) queryParser = QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) } else { queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient) } } else if (qsqBuilder.fields().size > 0) { val resolvedFields = QueryParserHelper.resolveMappingFields(context, qsqBuilder.fields()) + otherFields = resolvedFields queryParser = if (QueryParserHelper.hasAllFieldsWildcard(qsqBuilder.fields().keys)) { QueryStringQueryParserExt(context, resolvedFields, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) } else { @@ -99,12 +98,14 @@ object QueryStringQueryUtil { } else { val defaultFields: List = context.defaultFields() queryParser = if (QueryParserHelper.hasAllFieldsWildcard(defaultFields)) { + otherFields = resolveMatchAllPatternFields(context) QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) } else { val resolvedFields = QueryParserHelper.resolveMappingFields( context, QueryParserHelper.parseFieldsAndWeights(defaultFields) ) + otherFields = resolvedFields QueryStringQueryParserExt(context, resolvedFields, isLenient) } } @@ -131,9 +132,7 @@ object QueryStringQueryUtil { } queryParser.phraseSlop = qsqBuilder.phraseSlop() queryParser.setQuoteFieldSuffix(qsqBuilder.quoteFieldSuffix()) - queryParser.setAllowLeadingWildcard( - if (qsqBuilder.allowLeadingWildcard() == null) context.queryStringAllowLeadingWildcard() else qsqBuilder.allowLeadingWildcard() - ) + queryParser.allowLeadingWildcard = if (qsqBuilder.allowLeadingWildcard() == null) context.queryStringAllowLeadingWildcard() else qsqBuilder.allowLeadingWildcard() queryParser.setAnalyzeWildcard(if (qsqBuilder.analyzeWildcard() == null) context.queryStringAnalyzeWildcard() else qsqBuilder.analyzeWildcard()) queryParser.enablePositionIncrements = qsqBuilder.enablePositionIncrements() queryParser.setFuzziness(qsqBuilder.fuzziness()) @@ -152,6 +151,39 @@ object QueryStringQueryUtil { throw QueryShardException(context, "Failed to parse query [" + qsqBuilder.queryString() + "]", e) } // Return discovered fields - return queryParser.discoveredFields + return queryParser.discoveredFields to otherFields + } + + fun resolveMatchAllPatternFields( + context: QueryShardContext, + ): Map { + val allFields = context.simpleMatchToIndexNames("*") + val fields: MutableMap = HashMap() + for (fieldName in allFields) { + val fieldType = context.mapperService.fieldType(fieldName) ?: continue + if (fieldType.name().startsWith("_")) { + // Ignore metadata fields + continue + } + + try { + fieldType.termQuery("", context) + } catch (e: QueryShardException) { + // field type is never searchable with term queries (eg. geo point): ignore + continue + } catch (e: UnsupportedOperationException) { + continue + } catch (e: IllegalArgumentException) { + // other exceptions are parsing errors or not indexed fields: keep + } catch (e: OpenSearchParseException) { + } + + // Deduplicate aliases and their concrete fields. + val resolvedFieldName = fieldType.name() + if (allFields.contains(resolvedFieldName)) { + fields[fieldName] = 1.0f + } + } + return fields } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 9f19f2a9f..9e87345ad 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -327,6 +327,16 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "event_ts": { "type": "date" }, + "test": { + "properties": { + "fff": { + "type": "keyword" + }, + "vvv": { + "type": "keyword" + } + } + }, "state": { "type": "keyword" }, @@ -348,6 +358,8 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { val doc = """ { "event_ts": "2019-01-01T12:10:30Z", + "test.fff": "12345", + "test.vvv": "54321", "state": "TX", "state_ext": "CA", "state_ordinal": ${random().nextInt(0, 10)}, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 0682091c4..2f8a3ece8 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1133,8 +1133,10 @@ class RollupInterceptorIT : RollupRestTestCase() { "size": 0, "query": { "query_string": { - "query": "state:TX AND state_ext:CA" + "query": "state:TX AND state_ext:CA AND 5", + "default_field": "state_ordinal" } + }, "aggs": { "earnings_total": { @@ -1275,6 +1277,142 @@ class RollupInterceptorIT : RollupRestTestCase() { ) } + fun `test roll up search query_string query with missing fields and default_field`() { + val sourceIndex = "source_rollup_search_qsq_2" + val targetIndex = "target_rollup_qsq_search_2" + + createSampleIndexForQSQTest(sourceIndex) + + val rollup = Rollup( + id = "basic_query_string_query_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex, + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), + Terms("state", "state"), + Terms("state_ext", "state_ext"), + Terms("state_ordinal", "state_ordinal"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "earnings", targetField = "earnings", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + // Using ALL_MATCH_PATTERN for default_field but rollup job didn't include all fields + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345", + "default_field": "*" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue( + e.message?.contains( + "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + ) ?: false + ) + } + + // Using ALL_MATCH_PATTERN in one of fields in "fields" array but rollup job didn't include all fields + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345", + "fields": ["state", "*"] + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue( + e.message?.contains( + "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + ) ?: false + ) + } + + // field from "fields" list is missing in rollup + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345", + "fields": ["test.fff"] + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue(e.message?.contains("[missing field test.fff]") ?: false) + } + } + fun `test roll up search query_string query invalid query`() { val sourceIndex = "source_rollup_search_qsq_2" val targetIndex = "target_rollup_qsq_search_2" From 30d0635c0b565fc6ef76f94ce3416b913b1419b1 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 3 Jan 2023 20:40:15 +0100 Subject: [PATCH 15/21] detekt fixes Signed-off-by: Petar Dzepina --- .../rollup/query/QueryStringQueryUtil.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt index 3fb82c487..9638798db 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt @@ -72,7 +72,7 @@ object QueryStringQueryUtil { } return retVal } - + @Suppress("ComplexMethod", "LongMethod", "ThrowsCount", "EmptyCatchBlock") fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): Pair, Map> { val context = QueryShardContextFactory.createShardContext(concreteIndexName) val qsqBuilder = queryBuilder as QueryStringQueryBuilder @@ -132,8 +132,13 @@ object QueryStringQueryUtil { } queryParser.phraseSlop = qsqBuilder.phraseSlop() queryParser.setQuoteFieldSuffix(qsqBuilder.quoteFieldSuffix()) - queryParser.allowLeadingWildcard = if (qsqBuilder.allowLeadingWildcard() == null) context.queryStringAllowLeadingWildcard() else qsqBuilder.allowLeadingWildcard() - queryParser.setAnalyzeWildcard(if (qsqBuilder.analyzeWildcard() == null) context.queryStringAnalyzeWildcard() else qsqBuilder.analyzeWildcard()) + queryParser.allowLeadingWildcard = + if (qsqBuilder.allowLeadingWildcard() == null) context.queryStringAllowLeadingWildcard() + else qsqBuilder.allowLeadingWildcard() + queryParser.setAnalyzeWildcard( + if (qsqBuilder.analyzeWildcard() == null) context.queryStringAnalyzeWildcard() + else qsqBuilder.analyzeWildcard() + ) queryParser.enablePositionIncrements = qsqBuilder.enablePositionIncrements() queryParser.setFuzziness(qsqBuilder.fuzziness()) queryParser.fuzzyPrefixLength = qsqBuilder.fuzzyPrefixLength() @@ -154,6 +159,7 @@ object QueryStringQueryUtil { return queryParser.discoveredFields to otherFields } + @Suppress("EmptyCatchBlock", "LoopWithTooManyJumpStatements") fun resolveMatchAllPatternFields( context: QueryShardContext, ): Map { From 50ce80c2cc22832fc56350630fd7faf74fc6ea49 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 3 Jan 2023 21:03:41 +0100 Subject: [PATCH 16/21] test compile fix Signed-off-by: Petar Dzepina --- .../opensearch/indexmanagement/rollup/RollupRestTestCase.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 9e87345ad..a7eb988b8 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -362,7 +362,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "test.vvv": "54321", "state": "TX", "state_ext": "CA", - "state_ordinal": ${random().nextInt(0, 10)}, + "state_ordinal": ${random().nextInt(10)}, "earnings": $i } """.trimIndent() @@ -374,7 +374,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "event_ts": "2019-01-01T12:10:30Z", "state": "TA", "state_ext": "SE", - "state_ordinal": ${random().nextInt(0, 10)}, + "state_ordinal": ${random().nextInt(10)}, "earnings": $i } """.trimIndent() From 92cc9489e61e307641b6ce127dd796699fc25a0b Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 3 Jan 2023 21:14:05 +0100 Subject: [PATCH 17/21] compile test fix Signed-off-by: Petar Dzepina --- .../org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index a7eb988b8..fa505a3ce 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -386,7 +386,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "event_ts": "2019-01-02T12:10:30Z", "state": "CA", "state_ext": "MA", - "state_ordinal": ${random().nextInt(0, 10)}, + "state_ordinal": ${random().nextInt(10)}, "earnings": $i } """.trimIndent() From 05c65a707d70e90b1594fd2bda5171591a1eb377 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 3 Jan 2023 22:27:18 +0100 Subject: [PATCH 18/21] added test for falling back on index settings default_field Signed-off-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptorIT.kt | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 2f8a3ece8..c47a81f0a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1277,7 +1277,7 @@ class RollupInterceptorIT : RollupRestTestCase() { ) } - fun `test roll up search query_string query with missing fields and default_field`() { + fun `test roll up search query_string query with missing fields in fields and default_field`() { val sourceIndex = "source_rollup_search_qsq_2" val targetIndex = "target_rollup_qsq_search_2" @@ -1411,6 +1411,82 @@ class RollupInterceptorIT : RollupRestTestCase() { } catch (e: ResponseException) { assertTrue(e.message?.contains("[missing field test.fff]") ?: false) } + + // no fields or default_field present. Fallback on index setting [index.query.default_field] default value: "*" + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue( + e.message?.contains( + "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + ) ?: false + ) + } + + // fallback on index settings index.query.default_field:state_ordinal + client().makeRequest( + "PUT", "$sourceIndex/_settings", + StringEntity( + """ + { + "index": { + "query": { + "default_field":"state_ordinal" + } + } + } + """.trimIndent(), + ContentType.APPLICATION_JSON + ) + ) + + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 7" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + val rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + val rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + val rawAggRes = rawRes.asMap()["aggregations"] as Map> + val rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) } fun `test roll up search query_string query invalid query`() { From 9cf234e358594dace22fe16c8200cf73d5ced499 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 4 Jan 2023 01:56:17 +0100 Subject: [PATCH 19/21] fixed regarding fields Signed-off-by: Petar Dzepina --- .../rollup/query/QueryStringQueryParserExt.kt | 22 +++++++++++++------ .../rollup/query/QueryStringQueryUtil.kt | 16 +++++++++++--- .../rollup/RollupRestTestCase.kt | 6 ++--- .../rollup/interceptor/RollupInterceptorIT.kt | 14 ++++++------ 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt index 4f93d187c..dcb39b719 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt @@ -12,37 +12,45 @@ import org.opensearch.index.search.QueryStringQueryParser class QueryStringQueryParserExt : QueryStringQueryParser { val discoveredFields = mutableListOf() + var hasLonelyTerms = false constructor(context: QueryShardContext?, lenient: Boolean) : super(context, lenient) constructor(context: QueryShardContext?, defaultField: String, lenient: Boolean) : super(context, defaultField, lenient) constructor(context: QueryShardContext, resolvedFields: Map, lenient: Boolean) : super(context, resolvedFields, lenient) override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? { - if (field != null && field != "*") discoveredFields.add(field) + if (field == null) hasLonelyTerms = true + else if (field != "*") discoveredFields.add(field) return super.getFuzzyQuery(field, termStr, minSimilarity) } override fun getPrefixQuery(field: String?, termStr: String?): Query { - if (field != null && field != "*") discoveredFields.add(field) + if (field == null) hasLonelyTerms = true + else if (field != "*") discoveredFields.add(field) return super.getPrefixQuery(field, termStr) } override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query { - if (field != null && field != "*") discoveredFields.add(field) + if (field == null) hasLonelyTerms = true + else if (field != "*") discoveredFields.add(field) return super.getFieldQuery(field, queryText, quoted) } override fun getWildcardQuery(field: String?, termStr: String?): Query { - if (field != null && field != "*") discoveredFields.add(field) + if (field == null) hasLonelyTerms = true + else if (field != "*") discoveredFields.add(field) return super.getWildcardQuery(field, termStr) } override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query { - if (field != null && field != "*") discoveredFields.add(field) + if (field == null) hasLonelyTerms = true + else if (field != "*") discoveredFields.add(field) return super.getFieldQuery(field, queryText, slop) } override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query { - if (field != null && field != "*") discoveredFields.add(field) + if (field == null) hasLonelyTerms = true + else if (field != "*") discoveredFields.add(field) return super.getRangeQuery(field, part1, part2, startInclusive, endInclusive) } override fun getRegexpQuery(field: String?, termStr: String?): Query { - if (field != null && field != "*") discoveredFields.add(field) + if (field == null) hasLonelyTerms = true + else if (field != "*") discoveredFields.add(field) return super.getRegexpQuery(field, termStr) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt index 9638798db..639eef33d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt @@ -34,6 +34,14 @@ object QueryStringQueryUtil { fieldsFromQueryString.forEach { field -> newQueryString = newQueryString.replace("$field:", "$field.${Dimension.Type.TERMS.type}:") } + + var newDefaultField = qsqBuilder.defaultField() + if (newDefaultField != null && newDefaultField != "*") { + newDefaultField = newDefaultField + ".${Dimension.Type.TERMS.type}" + } else { + newDefaultField = null + } + var newFields: MutableMap? = null if (otherFields.isNotEmpty()) { newFields = mutableMapOf() @@ -64,7 +72,9 @@ object QueryStringQueryUtil { .boost(qsqBuilder.boost()) .fuzzyTranspositions(qsqBuilder.fuzzyTranspositions()) - if (newFields != null && newFields.size > 0) { + if (newDefaultField != null) { + retVal = retVal.defaultField(newDefaultField) + } else if (newFields != null && newFields.size > 0) { retVal = retVal.fields(newFields) } if (qsqBuilder.tieBreaker() != null) { @@ -153,10 +163,10 @@ object QueryStringQueryUtil { try { queryParser.parse(rewrittenQueryString) } catch (e: ParseException) { - throw QueryShardException(context, "Failed to parse query [" + qsqBuilder.queryString() + "]", e) + throw IllegalArgumentException("Failed to parse query [" + qsqBuilder.queryString() + "]", e) } // Return discovered fields - return queryParser.discoveredFields to otherFields + return queryParser.discoveredFields to if (queryParser.hasLonelyTerms) otherFields else mapOf() } @Suppress("EmptyCatchBlock", "LoopWithTooManyJumpStatements") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index fa505a3ce..c36e33f19 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -362,7 +362,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "test.vvv": "54321", "state": "TX", "state_ext": "CA", - "state_ordinal": ${random().nextInt(10)}, + "state_ordinal": ${i % 3}, "earnings": $i } """.trimIndent() @@ -374,7 +374,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "event_ts": "2019-01-01T12:10:30Z", "state": "TA", "state_ext": "SE", - "state_ordinal": ${random().nextInt(10)}, + "state_ordinal": ${i % 3}, "earnings": $i } """.trimIndent() @@ -386,7 +386,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "event_ts": "2019-01-02T12:10:30Z", "state": "CA", "state_ext": "MA", - "state_ordinal": ${random().nextInt(10)}, + "state_ordinal": ${i % 3}, "earnings": $i } """.trimIndent() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index c47a81f0a..fbc471b1d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1085,7 +1085,7 @@ class RollupInterceptorIT : RollupRestTestCase() { createSampleIndexForQSQTest(sourceIndex) val rollup = Rollup( - id = "basic_query_string_query_rollup_search", + id = "basic_query_string_query_rollup_search111", enabled = true, schemaVersion = 1L, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), @@ -1133,7 +1133,7 @@ class RollupInterceptorIT : RollupRestTestCase() { "size": 0, "query": { "query_string": { - "query": "state:TX AND state_ext:CA AND 5", + "query": "state:TX AND state_ext:CA AND 0", "default_field": "state_ordinal" } @@ -1278,8 +1278,8 @@ class RollupInterceptorIT : RollupRestTestCase() { } fun `test roll up search query_string query with missing fields in fields and default_field`() { - val sourceIndex = "source_rollup_search_qsq_2" - val targetIndex = "target_rollup_qsq_search_2" + val sourceIndex = "source_rollup_search_qsq_22" + val targetIndex = "target_rollup_qsq_search_22" createSampleIndexForQSQTest(sourceIndex) @@ -1542,7 +1542,7 @@ class RollupInterceptorIT : RollupRestTestCase() { "size": 0, "query": { "query_string": { - "query": "invalid query" + "query": "::!invalid+-+-::query:::" } }, "aggs": { @@ -1558,7 +1558,7 @@ class RollupInterceptorIT : RollupRestTestCase() { client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) fail("search should've failed due to incorrect query") } catch (e: ResponseException) { - assertTrue("The query_string query wasn't invalid", e.message!!.contains("The query_string query is invalid")) + assertTrue("The query_string query wasn't invalid", e.message!!.contains("Failed to parse query")) } } @@ -1631,7 +1631,7 @@ class RollupInterceptorIT : RollupRestTestCase() { client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) fail("search should've failed due to incorrect query") } catch (e: ResponseException) { - assertTrue("The query_string query wasn't invalid", e.message!!.contains("The query_string query is invalid")) + assertTrue("The query_string query wasn't invalid", e.message!!.contains("[missing field unknown_field]")) } } } From 4c7b9abd0725a295323faedba96fae16d011fe44 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Mon, 9 Jan 2023 13:24:21 +0100 Subject: [PATCH 20/21] added handling of _exists_ and prefix fields in default_field Signed-off-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptor.kt | 7 +- .../rollup/query/QueryStringQueryParserExt.kt | 33 +-- .../rollup/query/QueryStringQueryUtil.kt | 26 ++- .../rollup/util/QueryShardContextFactory.kt | 24 +-- .../indexmanagement/util/IndexUtils.kt | 60 ++++++ .../rollup/RollupRestTestCase.kt | 16 +- .../rollup/interceptor/RollupInterceptorIT.kt | 190 ++++++++++++------ 7 files changed, 264 insertions(+), 92 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index d3ac820a7..ad30f4821 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -31,6 +31,7 @@ import org.opensearch.indexmanagement.rollup.util.getRollupJobs import org.opensearch.indexmanagement.rollup.util.isRollupIndex import org.opensearch.indexmanagement.rollup.util.populateFieldMappings import org.opensearch.indexmanagement.rollup.util.rewriteSearchSourceBuilder +import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.search.aggregations.AggregationBuilder import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval @@ -86,9 +87,13 @@ class RollupInterceptor( val indices = request.indices().map { it.toString() }.toTypedArray() val concreteIndices = indexNameExpressionResolver .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) + // To extract fields from QueryStringQueryBuilder we need concrete source index name. val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0) ?: throw IllegalArgumentException("No rollup job associated with target_index") - val queryFieldMappings = getQueryMetadata(request.source().query(), rollupJob.sourceIndex) + val queryFieldMappings = getQueryMetadata( + request.source().query(), + IndexUtils.getConcreteIndex(rollupJob.sourceIndex, concreteIndices, clusterService.state()) + ) val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) val fieldMappings = queryFieldMappings + aggregationFieldMappings diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt index dcb39b719..ed1168e8f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt @@ -6,9 +6,12 @@ package org.opensearch.indexmanagement.rollup.query import org.apache.lucene.search.Query +import org.opensearch.common.regex.Regex import org.opensearch.index.query.QueryShardContext import org.opensearch.index.search.QueryStringQueryParser +const val EXISTS = "_exists_" + class QueryStringQueryParserExt : QueryStringQueryParser { val discoveredFields = mutableListOf() @@ -19,38 +22,40 @@ class QueryStringQueryParserExt : QueryStringQueryParser { constructor(context: QueryShardContext, resolvedFields: Map, lenient: Boolean) : super(context, resolvedFields, lenient) override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? { - if (field == null) hasLonelyTerms = true - else if (field != "*") discoveredFields.add(field) + handleFieldQueryDiscovered(field) return super.getFuzzyQuery(field, termStr, minSimilarity) } override fun getPrefixQuery(field: String?, termStr: String?): Query { - if (field == null) hasLonelyTerms = true - else if (field != "*") discoveredFields.add(field) + handleFieldQueryDiscovered(field) return super.getPrefixQuery(field, termStr) } override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query { - if (field == null) hasLonelyTerms = true - else if (field != "*") discoveredFields.add(field) + handleFieldQueryDiscovered(field, queryText) return super.getFieldQuery(field, queryText, quoted) } override fun getWildcardQuery(field: String?, termStr: String?): Query { - if (field == null) hasLonelyTerms = true - else if (field != "*") discoveredFields.add(field) + handleFieldQueryDiscovered(field) return super.getWildcardQuery(field, termStr) } override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query { - if (field == null) hasLonelyTerms = true - else if (field != "*") discoveredFields.add(field) + handleFieldQueryDiscovered(field, queryText) return super.getFieldQuery(field, queryText, slop) } override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query { - if (field == null) hasLonelyTerms = true - else if (field != "*") discoveredFields.add(field) + handleFieldQueryDiscovered(field) return super.getRangeQuery(field, part1, part2, startInclusive, endInclusive) } override fun getRegexpQuery(field: String?, termStr: String?): Query { - if (field == null) hasLonelyTerms = true - else if (field != "*") discoveredFields.add(field) + handleFieldQueryDiscovered(field) return super.getRegexpQuery(field, termStr) } + + private fun handleFieldQueryDiscovered(field: String?, queryText: String? = null) { + if (field == null || Regex.isSimpleMatchPattern(field)) { + hasLonelyTerms = true + } else { + if (field == EXISTS && queryText?.isNotEmpty() == true) discoveredFields.add(queryText) + else discoveredFields.add(field) + } + } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt index 639eef33d..a82197d5a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt @@ -32,11 +32,15 @@ object QueryStringQueryUtil { // Rewrite query_string var newQueryString = qsqBuilder.queryString() fieldsFromQueryString.forEach { field -> - newQueryString = newQueryString.replace("$field:", "$field.${Dimension.Type.TERMS.type}:") + val escapedField = escapeSpaceCharacters(field) + newQueryString = newQueryString.replace("$escapedField:", "$escapedField.${Dimension.Type.TERMS.type}:") + newQueryString = newQueryString.replace("$EXISTS:$escapedField", "$EXISTS:$escapedField.${Dimension.Type.TERMS.type}") } + // We will rewrite here only concrete default fields. + // Prefix ones we will resolve(otherFields) and insert into fields array var newDefaultField = qsqBuilder.defaultField() - if (newDefaultField != null && newDefaultField != "*") { + if (newDefaultField != null && Regex.isSimpleMatchPattern(newDefaultField) == false) { newDefaultField = newDefaultField + ".${Dimension.Type.TERMS.type}" } else { newDefaultField = null @@ -82,6 +86,12 @@ object QueryStringQueryUtil { } return retVal } + + private fun escapeSpaceCharacters(field: String): String { + val escapedField = field.replace(" ", "\\ ") + return escapedField + } + @Suppress("ComplexMethod", "LongMethod", "ThrowsCount", "EmptyCatchBlock") fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): Pair, Map> { val context = QueryShardContextFactory.createShardContext(concreteIndexName) @@ -92,8 +102,11 @@ object QueryStringQueryUtil { var otherFields = mapOf() if (qsqBuilder.defaultField() != null) { if (Regex.isMatchAllPattern(qsqBuilder.defaultField())) { - otherFields = resolveMatchAllPatternFields(context) + otherFields = resolveMatchPatternFields(context) queryParser = QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) + } else if (Regex.isSimpleMatchPattern(qsqBuilder.defaultField())) { + otherFields = resolveMatchPatternFields(context, qsqBuilder.defaultField()) + queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient) } else { queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient) } @@ -108,7 +121,7 @@ object QueryStringQueryUtil { } else { val defaultFields: List = context.defaultFields() queryParser = if (QueryParserHelper.hasAllFieldsWildcard(defaultFields)) { - otherFields = resolveMatchAllPatternFields(context) + otherFields = resolveMatchPatternFields(context) QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) } else { val resolvedFields = QueryParserHelper.resolveMappingFields( @@ -170,10 +183,11 @@ object QueryStringQueryUtil { } @Suppress("EmptyCatchBlock", "LoopWithTooManyJumpStatements") - fun resolveMatchAllPatternFields( + fun resolveMatchPatternFields( context: QueryShardContext, + pattern: String = "*" ): Map { - val allFields = context.simpleMatchToIndexNames("*") + val allFields = context.simpleMatchToIndexNames(pattern) val fields: MutableMap = HashMap() for (fieldName in allFields) { val fieldType = context.mapperService.fieldType(fieldName) ?: continue diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt index 536a94b0b..b34b535a0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt @@ -29,6 +29,10 @@ import org.opensearch.plugins.PluginsService import org.opensearch.script.ScriptService import java.time.Instant +/** + * Creates QueryShardContext object which is used in QueryStringQuery rewrite. + * We need this because we have to use QueryStringQueryParser class which requires QueryShardContext as parameter + */ object QueryShardContextFactory { lateinit var client: Client lateinit var clusterService: ClusterService @@ -54,17 +58,13 @@ object QueryShardContextFactory { this.environment = environment } - fun getIndexSettingsAndMetadata(indexName: String?): Triple { - var index: Index? - var indexSettings: Settings? + private fun getIndexSettingsAndMetadata(indexName: String?): Triple { + val index: Index? + val indexSettings: Settings? val indexMetadata = clusterService.state().metadata.index(indexName) - if (indexMetadata != null) { - index = indexMetadata.index - indexSettings = indexMetadata.settings - } else { - index = Index("dummyIndexName", "randomindexuuid123456") - indexSettings = Settings.EMPTY - } + ?: throw IllegalArgumentException("Can't find IndexMetadata for index: [$indexName]") + index = indexMetadata.index + indexSettings = indexMetadata.settings return Triple(index, indexSettings, indexMetadata) } @@ -82,7 +82,7 @@ object QueryShardContextFactory { additionalSettings, pluginsService.pluginSettingsFilter, emptySet() ) - val indexScopedSettings: IndexScopedSettings = settingsModule.getIndexScopedSettings() + val indexScopedSettings: IndexScopedSettings = settingsModule.indexScopedSettings val idxSettings = newIndexSettings(index, indexSettings, indexScopedSettings) val indicesModule = IndicesModule(pluginsService.filterPlugins(MapperPlugin::class.java)) val mapperRegistry = indicesModule.mapperRegistry @@ -123,7 +123,7 @@ object QueryShardContextFactory { ) } - fun newIndexSettings(index: Index?, settings: Settings?, indexScopedSettings: IndexScopedSettings?): IndexSettings? { + private fun newIndexSettings(index: Index?, settings: Settings?, indexScopedSettings: IndexScopedSettings?): IndexSettings? { val build = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt index 31a7ea367..aa7d13d37 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt @@ -193,5 +193,65 @@ class IndexUtils { val byteArray = ByteBuffer.allocate(BYTE_ARRAY_SIZE).putLong(hash.h1).putLong(hash.h2).array() return Base64.getUrlEncoder().withoutPadding().encodeToString(byteArray) } + + fun isDataStream(name: String?, clusterState: ClusterState): Boolean { + return clusterState.metadata.dataStreams().containsKey(name) + } + + fun isAlias(indexName: String?, clusterState: ClusterState): Boolean { + return clusterState.metadata.hasAlias(indexName) + } + + fun getWriteIndex(indexName: String?, clusterState: ClusterState): String? { + if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) { + val writeIndexMetadata = clusterState.metadata + .indicesLookup[indexName]!!.writeIndex + if (writeIndexMetadata != null) { + return writeIndexMetadata.index.name + } + } + return null + } + + fun getNewestIndexByCreationDate(concreteIndices: Array, clusterState: ClusterState): String { + val lookup = clusterState.metadata.indicesLookup + var maxCreationDate = Long.MIN_VALUE + var newestIndex: String = concreteIndices[0] + for (indexName in concreteIndices) { + val index = lookup[indexName] + val indexMetadata = clusterState.metadata.index(indexName) + if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) { + if (indexMetadata.creationDate > maxCreationDate) { + maxCreationDate = indexMetadata.creationDate + newestIndex = indexName + } + } + } + return newestIndex + } + + fun isConcreteIndex(indexName: String?, clusterState: ClusterState): Boolean { + return clusterState.metadata + .indicesLookup[indexName]!!.type == IndexAbstraction.Type.CONCRETE_INDEX + } + + fun getConcreteIndex(indexName: String, concreteIndices: Array, clusterState: ClusterState): String { + + if (concreteIndices.isEmpty()) { + throw IllegalArgumentException("ConcreteIndices list can't be empty!") + } + + var concreteIndexName: String + if (concreteIndices.size == 1 && isConcreteIndex(indexName, clusterState)) { + concreteIndexName = indexName + } else if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) { + concreteIndexName = getWriteIndex(indexName, clusterState) + ?: getNewestIndexByCreationDate(concreteIndices, clusterState) // + } else { + concreteIndexName = getNewestIndexByCreationDate(concreteIndices, clusterState) + } + + return concreteIndexName + } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index c36e33f19..9c33dc8bc 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -343,9 +343,15 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "state_ext": { "type": "keyword" }, + "state_ext2": { + "type": "keyword" + }, "state_ordinal": { "type": "long" }, + "abc test": { + "type": "long" + }, "earnings": { "type": "long" } @@ -362,6 +368,8 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "test.vvv": "54321", "state": "TX", "state_ext": "CA", + "state_ext2": "TX", + "abc test": 123, "state_ordinal": ${i % 3}, "earnings": $i } @@ -374,7 +382,9 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "event_ts": "2019-01-01T12:10:30Z", "state": "TA", "state_ext": "SE", - "state_ordinal": ${i % 3}, + "state_ext2": "CA", + "state_ordinal": ${i % 3}, + "abc test": 123, "earnings": $i } """.trimIndent() @@ -386,7 +396,9 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { "event_ts": "2019-01-02T12:10:30Z", "state": "CA", "state_ext": "MA", - "state_ordinal": ${i % 3}, + "state_ext2": "CA", + "state_ordinal": ${i % 3}, + "abc test": 123, "earnings": $i } """.trimIndent() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index fbc471b1d..f554f3e42 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1103,7 +1103,9 @@ class RollupInterceptorIT : RollupRestTestCase() { DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), Terms("state", "state"), Terms("state_ext", "state_ext"), + Terms("state_ext2", "state_ext2"), Terms("state_ordinal", "state_ordinal"), + Terms("abc test", "abc test"), ), metrics = listOf( RollupMetrics( @@ -1275,59 +1277,39 @@ class RollupInterceptorIT : RollupRestTestCase() { rawAggRes.getValue("earnings_total")["value"], rollupAggRes.getValue("earnings_total")["value"] ) - } - - fun `test roll up search query_string query with missing fields in fields and default_field`() { - val sourceIndex = "source_rollup_search_qsq_22" - val targetIndex = "target_rollup_qsq_search_22" - - createSampleIndexForQSQTest(sourceIndex) - - val rollup = Rollup( - id = "basic_query_string_query_rollup_search", - enabled = true, - schemaVersion = 1L, - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - jobLastUpdatedTime = Instant.now(), - jobEnabledTime = Instant.now(), - description = "basic search test", - sourceIndex = sourceIndex, - targetIndex = targetIndex, - metadataID = null, - roles = emptyList(), - pageSize = 10, - delay = 0, - continuous = false, - dimensions = listOf( - DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), - Terms("state", "state"), - Terms("state_ext", "state_ext"), - Terms("state_ordinal", "state_ordinal"), - ), - metrics = listOf( - RollupMetrics( - sourceField = "earnings", targetField = "earnings", - metrics = listOf( - Sum(), Min(), Max(), - ValueCount(), Average() - ) - ) - ) - ).let { createRollup(it, it.id) } - - updateRollupStartTime(rollup) - - waitFor { - val rollupJob = getRollup(rollupId = rollup.id) - assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) - } - - refreshAllIndices() + // Query with field prefix + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "123", + "default_field":"abc*" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) // Using ALL_MATCH_PATTERN for default_field but rollup job didn't include all fields - var req = """ + req = """ { "size": 0, "query": { @@ -1457,7 +1439,7 @@ class RollupInterceptorIT : RollupRestTestCase() { ContentType.APPLICATION_JSON ) ) - + // req = """ { "size": 0, @@ -1476,12 +1458,106 @@ class RollupInterceptorIT : RollupRestTestCase() { } } """.trimIndent() - val rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) assertTrue(rawRes.restStatus() == RestStatus.OK) - val rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) assertTrue(rollupRes.restStatus() == RestStatus.OK) - val rawAggRes = rawRes.asMap()["aggregations"] as Map> - val rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // prefix pattern in "default_field" field + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "TX AND CA", + "default_field": "state_e*" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // field with space in query: + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "abc\\ test:123" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // _exists_:field + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "_exists_:abc\\ test" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> assertEquals( "Source and rollup index did not return same min results", rawAggRes.getValue("earnings_total")["value"], @@ -1536,7 +1612,7 @@ class RollupInterceptorIT : RollupRestTestCase() { refreshAllIndices() - // Term query + // Invalid query var req = """ { "size": 0, From 05a559fd33afc92dd13ea7120df20b62f32381b6 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Mon, 9 Jan 2023 13:46:56 +0100 Subject: [PATCH 21/21] empty commit Signed-off-by: Petar Dzepina