diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index cc9ba8c38..75a0f536a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -112,6 +112,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings +import org.opensearch.indexmanagement.rollup.util.QueryShardContextFactory import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler @@ -359,6 +360,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ): Collection { val settings = environment.settings() this.clusterService = clusterService + QueryShardContextFactory.init( + client, + clusterService, + scriptService, + xContentRegistry, + namedWriteableRegistry, + environment + ) rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver) val jvmService = JvmService(environment.settings()) val transformRunner = TransformRunner.initialize( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 030b3fb5b..316451ac7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -15,6 +15,7 @@ import org.opensearch.index.query.DisMaxQueryBuilder import org.opensearch.index.query.MatchAllQueryBuilder import org.opensearch.index.query.MatchPhraseQueryBuilder import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.QueryStringQueryBuilder import org.opensearch.index.query.RangeQueryBuilder import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.query.TermsQueryBuilder @@ -23,12 +24,14 @@ import org.opensearch.indexmanagement.common.model.dimension.Dimension import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping.Companion.UNKNOWN_MAPPING +import org.opensearch.indexmanagement.rollup.query.QueryStringQueryUtil import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.rollup.util.getDateHistogram import org.opensearch.indexmanagement.rollup.util.getRollupJobs import org.opensearch.indexmanagement.rollup.util.isRollupIndex import org.opensearch.indexmanagement.rollup.util.populateFieldMappings import org.opensearch.indexmanagement.rollup.util.rewriteSearchSourceBuilder +import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.search.aggregations.AggregationBuilder import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval @@ -84,8 +87,13 @@ class RollupInterceptor( val indices = request.indices().map { it.toString() }.toTypedArray() val concreteIndices = indexNameExpressionResolver .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) - - val queryFieldMappings = getQueryMetadata(request.source().query()) + // To extract fields from QueryStringQueryBuilder we need concrete source index name. + val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0) + ?: throw IllegalArgumentException("No rollup job associated with target_index") + val queryFieldMappings = getQueryMetadata( + request.source().query(), + IndexUtils.getConcreteIndex(rollupJob.sourceIndex, concreteIndices, clusterService.state()) + ) val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) val fieldMappings = queryFieldMappings + aggregationFieldMappings @@ -160,9 +168,10 @@ class RollupInterceptor( return fieldMappings } - @Suppress("ComplexMethod") + @Suppress("ComplexMethod", "ThrowsCount") private fun getQueryMetadata( query: QueryBuilder?, + concreteSourceIndexName: String, fieldMappings: MutableSet = mutableSetOf() ): Set { if (query == null) { @@ -183,32 +192,41 @@ class RollupInterceptor( // do nothing } is BoolQueryBuilder -> { - query.must()?.forEach { this.getQueryMetadata(it, fieldMappings) } - query.mustNot()?.forEach { this.getQueryMetadata(it, fieldMappings) } - query.should()?.forEach { this.getQueryMetadata(it, fieldMappings) } - query.filter()?.forEach { this.getQueryMetadata(it, fieldMappings) } + query.must()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } + query.mustNot()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } + query.should()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } + query.filter()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } } is BoostingQueryBuilder -> { - this.getQueryMetadata(query.positiveQuery(), fieldMappings) - this.getQueryMetadata(query.negativeQuery(), fieldMappings) + this.getQueryMetadata(query.positiveQuery(), concreteSourceIndexName, fieldMappings) + this.getQueryMetadata(query.negativeQuery(), concreteSourceIndexName, fieldMappings) } is ConstantScoreQueryBuilder -> { - this.getQueryMetadata(query.innerQuery(), fieldMappings) + this.getQueryMetadata(query.innerQuery(), concreteSourceIndexName, fieldMappings) } is DisMaxQueryBuilder -> { - query.innerQueries().forEach { this.getQueryMetadata(it, fieldMappings) } + query.innerQueries().forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } } is MatchPhraseQueryBuilder -> { if (!query.analyzer().isNullOrEmpty() || query.slop() != MatchQuery.DEFAULT_PHRASE_SLOP || query.zeroTermsQuery() != MatchQuery.DEFAULT_ZERO_TERMS_QUERY ) { throw IllegalArgumentException( - "The ${query.name} query is currently not supported with analyzer/slop/zero_terms_query in " + - "rollups" + "The ${query.name} query is currently not supported with analyzer/slop/zero_terms_query in rollups" ) } fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type)) } + is QueryStringQueryBuilder -> { + // Throws IllegalArgumentException if unable to parse query + val (queryFields, otherFields) = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName) + for (field in queryFields) { + fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, field, Dimension.Type.TERMS.type)) + } + for (field in otherFields.keys) { + fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, field, Dimension.Type.TERMS.type)) + } + } else -> { throw IllegalArgumentException("The ${query.name} query is currently not supported in rollups") } @@ -285,9 +303,9 @@ class RollupInterceptor( val matchedRollup = pickRollupJob(matchingRollupJobs.keys) val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType }) if (searchAllJobs) { - request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap)) + request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, matchedRollup.sourceIndex)) } else { - request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap)) + request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, matchedRollup.sourceIndex)) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt new file mode 100644 index 000000000..ed1168e8f --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryParserExt.kt @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.query + +import org.apache.lucene.search.Query +import org.opensearch.common.regex.Regex +import org.opensearch.index.query.QueryShardContext +import org.opensearch.index.search.QueryStringQueryParser + +const val EXISTS = "_exists_" + +class QueryStringQueryParserExt : QueryStringQueryParser { + + val discoveredFields = mutableListOf() + var hasLonelyTerms = false + + constructor(context: QueryShardContext?, lenient: Boolean) : super(context, lenient) + constructor(context: QueryShardContext?, defaultField: String, lenient: Boolean) : super(context, defaultField, lenient) + constructor(context: QueryShardContext, resolvedFields: Map, lenient: Boolean) : super(context, resolvedFields, lenient) + + override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? { + handleFieldQueryDiscovered(field) + return super.getFuzzyQuery(field, termStr, minSimilarity) + } + override fun getPrefixQuery(field: String?, termStr: String?): Query { + handleFieldQueryDiscovered(field) + return super.getPrefixQuery(field, termStr) + } + override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query { + handleFieldQueryDiscovered(field, queryText) + return super.getFieldQuery(field, queryText, quoted) + } + override fun getWildcardQuery(field: String?, termStr: String?): Query { + handleFieldQueryDiscovered(field) + return super.getWildcardQuery(field, termStr) + } + override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query { + handleFieldQueryDiscovered(field, queryText) + return super.getFieldQuery(field, queryText, slop) + } + override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query { + handleFieldQueryDiscovered(field) + return super.getRangeQuery(field, part1, part2, startInclusive, endInclusive) + } + override fun getRegexpQuery(field: String?, termStr: String?): Query { + handleFieldQueryDiscovered(field) + return super.getRegexpQuery(field, termStr) + } + + private fun handleFieldQueryDiscovered(field: String?, queryText: String? = null) { + if (field == null || Regex.isSimpleMatchPattern(field)) { + hasLonelyTerms = true + } else { + if (field == EXISTS && queryText?.isNotEmpty() == true) discoveredFields.add(queryText) + else discoveredFields.add(field) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt new file mode 100644 index 000000000..a82197d5a --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt @@ -0,0 +1,219 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.query + +import org.apache.lucene.queryparser.classic.ParseException +import org.apache.lucene.queryparser.classic.QueryParser +import org.opensearch.OpenSearchParseException +import org.opensearch.common.regex.Regex +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.index.analysis.NamedAnalyzer +import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.QueryShardContext +import org.opensearch.index.query.QueryShardException +import org.opensearch.index.query.QueryStringQueryBuilder +import org.opensearch.index.query.support.QueryParsers +import org.opensearch.index.search.QueryParserHelper +import org.opensearch.indexmanagement.common.model.dimension.Dimension +import org.opensearch.indexmanagement.rollup.util.QueryShardContextFactory + +object QueryStringQueryUtil { + + fun rewriteQueryStringQuery( + queryBuilder: QueryBuilder, + concreteIndexName: String + ): QueryStringQueryBuilder { + val qsqBuilder = queryBuilder as QueryStringQueryBuilder + // Parse query_string query and extract all discovered fields + val (fieldsFromQueryString, otherFields) = extractFieldsFromQueryString(queryBuilder, concreteIndexName) + // Rewrite query_string + var newQueryString = qsqBuilder.queryString() + fieldsFromQueryString.forEach { field -> + val escapedField = escapeSpaceCharacters(field) + newQueryString = newQueryString.replace("$escapedField:", "$escapedField.${Dimension.Type.TERMS.type}:") + newQueryString = newQueryString.replace("$EXISTS:$escapedField", "$EXISTS:$escapedField.${Dimension.Type.TERMS.type}") + } + + // We will rewrite here only concrete default fields. + // Prefix ones we will resolve(otherFields) and insert into fields array + var newDefaultField = qsqBuilder.defaultField() + if (newDefaultField != null && Regex.isSimpleMatchPattern(newDefaultField) == false) { + newDefaultField = newDefaultField + ".${Dimension.Type.TERMS.type}" + } else { + newDefaultField = null + } + + var newFields: MutableMap? = null + if (otherFields.isNotEmpty()) { + newFields = mutableMapOf() + otherFields.forEach { + newFields.put("${it.key}.${Dimension.Type.TERMS.type}", it.value) + } + } + var retVal = QueryStringQueryBuilder(newQueryString) + .rewrite(qsqBuilder.rewrite()) + .fuzzyRewrite(qsqBuilder.fuzzyRewrite()) + .autoGenerateSynonymsPhraseQuery(qsqBuilder.autoGenerateSynonymsPhraseQuery()) + .allowLeadingWildcard(qsqBuilder.allowLeadingWildcard()) + .analyzeWildcard(qsqBuilder.analyzeWildcard()) + .defaultOperator(qsqBuilder.defaultOperator()) + .escape(qsqBuilder.escape()) + .fuzziness(qsqBuilder.fuzziness()) + .lenient(qsqBuilder.lenient()) + .enablePositionIncrements(qsqBuilder.enablePositionIncrements()) + .fuzzyMaxExpansions(qsqBuilder.fuzzyMaxExpansions()) + .fuzzyPrefixLength(qsqBuilder.fuzzyPrefixLength()) + .queryName(qsqBuilder.queryName()) + .quoteAnalyzer(qsqBuilder.quoteAnalyzer()) + .analyzer(qsqBuilder.analyzer()) + .minimumShouldMatch(qsqBuilder.minimumShouldMatch()) + .timeZone(qsqBuilder.timeZone()) + .phraseSlop(qsqBuilder.phraseSlop()) + .quoteFieldSuffix(qsqBuilder.quoteFieldSuffix()) + .boost(qsqBuilder.boost()) + .fuzzyTranspositions(qsqBuilder.fuzzyTranspositions()) + + if (newDefaultField != null) { + retVal = retVal.defaultField(newDefaultField) + } else if (newFields != null && newFields.size > 0) { + retVal = retVal.fields(newFields) + } + if (qsqBuilder.tieBreaker() != null) { + retVal = retVal.tieBreaker(qsqBuilder.tieBreaker()) + } + return retVal + } + + private fun escapeSpaceCharacters(field: String): String { + val escapedField = field.replace(" ", "\\ ") + return escapedField + } + + @Suppress("ComplexMethod", "LongMethod", "ThrowsCount", "EmptyCatchBlock") + fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): Pair, Map> { + val context = QueryShardContextFactory.createShardContext(concreteIndexName) + val qsqBuilder = queryBuilder as QueryStringQueryBuilder + val rewrittenQueryString = if (qsqBuilder.escape()) QueryParser.escape(qsqBuilder.queryString()) else qsqBuilder.queryString() + val queryParser: QueryStringQueryParserExt + val isLenient: Boolean = if (qsqBuilder.lenient() == null) context.queryStringLenient() else qsqBuilder.lenient() + var otherFields = mapOf() + if (qsqBuilder.defaultField() != null) { + if (Regex.isMatchAllPattern(qsqBuilder.defaultField())) { + otherFields = resolveMatchPatternFields(context) + queryParser = QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) + } else if (Regex.isSimpleMatchPattern(qsqBuilder.defaultField())) { + otherFields = resolveMatchPatternFields(context, qsqBuilder.defaultField()) + queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient) + } else { + queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient) + } + } else if (qsqBuilder.fields().size > 0) { + val resolvedFields = QueryParserHelper.resolveMappingFields(context, qsqBuilder.fields()) + otherFields = resolvedFields + queryParser = if (QueryParserHelper.hasAllFieldsWildcard(qsqBuilder.fields().keys)) { + QueryStringQueryParserExt(context, resolvedFields, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) + } else { + QueryStringQueryParserExt(context, resolvedFields, isLenient) + } + } else { + val defaultFields: List = context.defaultFields() + queryParser = if (QueryParserHelper.hasAllFieldsWildcard(defaultFields)) { + otherFields = resolveMatchPatternFields(context) + QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient()) + } else { + val resolvedFields = QueryParserHelper.resolveMappingFields( + context, + QueryParserHelper.parseFieldsAndWeights(defaultFields) + ) + otherFields = resolvedFields + QueryStringQueryParserExt(context, resolvedFields, isLenient) + } + } + + if (qsqBuilder.analyzer() != null) { + val namedAnalyzer: NamedAnalyzer = context.getIndexAnalyzers().get(qsqBuilder.analyzer()) + ?: throw QueryShardException(context, "[query_string] analyzer [$qsqBuilder.analyzer] not found") + queryParser.setForceAnalyzer(namedAnalyzer) + } + + if (qsqBuilder.quoteAnalyzer() != null) { + val forceQuoteAnalyzer: NamedAnalyzer = context.getIndexAnalyzers().get(qsqBuilder.quoteAnalyzer()) + ?: throw QueryShardException(context, "[query_string] quote_analyzer [$qsqBuilder.quoteAnalyzer] not found") + queryParser.setForceQuoteAnalyzer(forceQuoteAnalyzer) + } + + queryParser.defaultOperator = qsqBuilder.defaultOperator().toQueryParserOperator() + // TODO can we extract this somehow? There's no getter for this + queryParser.setType(QueryStringQueryBuilder.DEFAULT_TYPE) + if (qsqBuilder.tieBreaker() != null) { + queryParser.setGroupTieBreaker(qsqBuilder.tieBreaker()) + } else { + queryParser.setGroupTieBreaker(QueryStringQueryBuilder.DEFAULT_TYPE.tieBreaker()) + } + queryParser.phraseSlop = qsqBuilder.phraseSlop() + queryParser.setQuoteFieldSuffix(qsqBuilder.quoteFieldSuffix()) + queryParser.allowLeadingWildcard = + if (qsqBuilder.allowLeadingWildcard() == null) context.queryStringAllowLeadingWildcard() + else qsqBuilder.allowLeadingWildcard() + queryParser.setAnalyzeWildcard( + if (qsqBuilder.analyzeWildcard() == null) context.queryStringAnalyzeWildcard() + else qsqBuilder.analyzeWildcard() + ) + queryParser.enablePositionIncrements = qsqBuilder.enablePositionIncrements() + queryParser.setFuzziness(qsqBuilder.fuzziness()) + queryParser.fuzzyPrefixLength = qsqBuilder.fuzzyPrefixLength() + queryParser.setFuzzyMaxExpansions(qsqBuilder.fuzzyMaxExpansions()) + queryParser.setFuzzyRewriteMethod(QueryParsers.parseRewriteMethod(qsqBuilder.fuzzyRewrite(), LoggingDeprecationHandler.INSTANCE)) + queryParser.multiTermRewriteMethod = QueryParsers.parseRewriteMethod(qsqBuilder.rewrite(), LoggingDeprecationHandler.INSTANCE) + queryParser.setTimeZone(qsqBuilder.timeZone()) + queryParser.determinizeWorkLimit = qsqBuilder.maxDeterminizedStates() + queryParser.autoGenerateMultiTermSynonymsPhraseQuery = qsqBuilder.autoGenerateSynonymsPhraseQuery() + queryParser.setFuzzyTranspositions(qsqBuilder.fuzzyTranspositions()) + + try { + queryParser.parse(rewrittenQueryString) + } catch (e: ParseException) { + throw IllegalArgumentException("Failed to parse query [" + qsqBuilder.queryString() + "]", e) + } + // Return discovered fields + return queryParser.discoveredFields to if (queryParser.hasLonelyTerms) otherFields else mapOf() + } + + @Suppress("EmptyCatchBlock", "LoopWithTooManyJumpStatements") + fun resolveMatchPatternFields( + context: QueryShardContext, + pattern: String = "*" + ): Map { + val allFields = context.simpleMatchToIndexNames(pattern) + val fields: MutableMap = HashMap() + for (fieldName in allFields) { + val fieldType = context.mapperService.fieldType(fieldName) ?: continue + if (fieldType.name().startsWith("_")) { + // Ignore metadata fields + continue + } + + try { + fieldType.termQuery("", context) + } catch (e: QueryShardException) { + // field type is never searchable with term queries (eg. geo point): ignore + continue + } catch (e: UnsupportedOperationException) { + continue + } catch (e: IllegalArgumentException) { + // other exceptions are parsing errors or not indexed fields: keep + } catch (e: OpenSearchParseException) { + } + + // Deduplicate aliases and their concrete fields. + val resolvedFieldName = fieldType.name() + if (allFields.contains(resolvedFieldName)) { + fields[fieldName] = 1.0f + } + } + return fields + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt new file mode 100644 index 000000000..b34b535a0 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.util + +import org.opensearch.Version +import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.io.stream.NamedWriteableRegistry +import org.opensearch.common.regex.Regex +import org.opensearch.common.settings.IndexScopedSettings +import org.opensearch.common.settings.Settings +import org.opensearch.common.settings.SettingsModule +import org.opensearch.common.util.BigArrays +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.env.Environment +import org.opensearch.index.Index +import org.opensearch.index.IndexSettings +import org.opensearch.index.mapper.MapperService +import org.opensearch.index.query.QueryShardContext +import org.opensearch.index.similarity.SimilarityService +import org.opensearch.indices.IndicesModule +import org.opensearch.indices.analysis.AnalysisModule +import org.opensearch.plugins.MapperPlugin +import org.opensearch.plugins.PluginsService +import org.opensearch.script.ScriptService +import java.time.Instant + +/** + * Creates QueryShardContext object which is used in QueryStringQuery rewrite. + * We need this because we have to use QueryStringQueryParser class which requires QueryShardContext as parameter + */ +object QueryShardContextFactory { + lateinit var client: Client + lateinit var clusterService: ClusterService + lateinit var scriptService: ScriptService + lateinit var xContentRegistry: NamedXContentRegistry + lateinit var namedWriteableRegistry: NamedWriteableRegistry + lateinit var environment: Environment + + @Suppress("LongParameterList") + fun init( + client: Client, + clusterService: ClusterService, + scriptService: ScriptService, + xContentRegistry: NamedXContentRegistry, + namedWriteableRegistry: NamedWriteableRegistry, + environment: Environment + ) { + this.client = client + this.clusterService = clusterService + this.scriptService = scriptService + this.xContentRegistry = xContentRegistry + this.namedWriteableRegistry = namedWriteableRegistry + this.environment = environment + } + + private fun getIndexSettingsAndMetadata(indexName: String?): Triple { + val index: Index? + val indexSettings: Settings? + val indexMetadata = clusterService.state().metadata.index(indexName) + ?: throw IllegalArgumentException("Can't find IndexMetadata for index: [$indexName]") + index = indexMetadata.index + indexSettings = indexMetadata.settings + return Triple(index, indexSettings, indexMetadata) + } + + fun createShardContext(indexName: String?): QueryShardContext { + val (index, indexSettings, indexMetadata) = getIndexSettingsAndMetadata(indexName) + val nodeSettings = Settings.builder() + .put("node.name", "dummyNodeName") + .put(Environment.PATH_HOME_SETTING.key, environment.tmpFile()) + .build() + val pluginsService = + PluginsService(nodeSettings, null, null, null, listOf()) + val additionalSettings = pluginsService.pluginSettings + val settingsModule = SettingsModule( + nodeSettings, + additionalSettings, + pluginsService.pluginSettingsFilter, emptySet() + ) + val indexScopedSettings: IndexScopedSettings = settingsModule.indexScopedSettings + val idxSettings = newIndexSettings(index, indexSettings, indexScopedSettings) + val indicesModule = IndicesModule(pluginsService.filterPlugins(MapperPlugin::class.java)) + val mapperRegistry = indicesModule.mapperRegistry + val analysisModule = AnalysisModule(environment, emptyList()) + val indexAnalyzers = analysisModule.analysisRegistry.build(idxSettings) + val similarityService = SimilarityService(idxSettings, null, emptyMap()) + val mapperService = MapperService( + idxSettings, + indexAnalyzers, + xContentRegistry, + similarityService, + mapperRegistry, + { createShardContext(null) }, + { false }, + scriptService + ) + // In order to be able to call toQuery method on QueryBuilder, we need to setup mappings in MapperService + mapperService.merge("_doc", indexMetadata?.mapping()?.source(), MapperService.MergeReason.MAPPING_UPDATE) + + return QueryShardContext( + 0, + idxSettings, + BigArrays.NON_RECYCLING_INSTANCE, + null, + null, + mapperService, + null, + scriptService, + xContentRegistry, + namedWriteableRegistry, + null, + null, + { Instant.now().toEpochMilli() }, + null, + { pattern -> Regex.simpleMatch(pattern, index?.name) }, + { true }, + null + ) + } + + private fun newIndexSettings(index: Index?, settings: Settings?, indexScopedSettings: IndexScopedSettings?): IndexSettings? { + val build = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(settings) + .build() + val metadata = IndexMetadata.builder(index?.name).settings(build).build() + return IndexSettings(metadata, Settings.EMPTY, indexScopedSettings) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index 7ca16858c..e181d32af 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -24,6 +24,7 @@ import org.opensearch.index.query.DisMaxQueryBuilder import org.opensearch.index.query.MatchAllQueryBuilder import org.opensearch.index.query.MatchPhraseQueryBuilder import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.QueryStringQueryBuilder import org.opensearch.index.query.RangeQueryBuilder import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.query.TermsQueryBuilder @@ -41,6 +42,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.Max import org.opensearch.indexmanagement.rollup.model.metric.Min import org.opensearch.indexmanagement.rollup.model.metric.Sum import org.opensearch.indexmanagement.rollup.model.metric.ValueCount +import org.opensearch.indexmanagement.rollup.query.QueryStringQueryUtil import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.util.IndexUtils @@ -285,7 +287,11 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag } @Suppress("ComplexMethod", "LongMethod") -fun Rollup.rewriteQueryBuilder(queryBuilder: QueryBuilder, fieldNameMappingTypeMap: Map): QueryBuilder { +fun Rollup.rewriteQueryBuilder( + queryBuilder: QueryBuilder, + fieldNameMappingTypeMap: Map, + concreteIndexName: String = "" +): QueryBuilder { return when (queryBuilder) { is TermQueryBuilder -> { val updatedFieldName = queryBuilder.fieldName() + "." + Dimension.Type.TERMS.type @@ -315,19 +321,19 @@ fun Rollup.rewriteQueryBuilder(queryBuilder: QueryBuilder, fieldNameMappingTypeM is BoolQueryBuilder -> { val newBoolQueryBuilder = BoolQueryBuilder() queryBuilder.must()?.forEach { - val newMustQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) + val newMustQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) newBoolQueryBuilder.must(newMustQueryBuilder) } queryBuilder.mustNot()?.forEach { - val newMustNotQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) + val newMustNotQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) newBoolQueryBuilder.mustNot(newMustNotQueryBuilder) } queryBuilder.should()?.forEach { - val newShouldQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) + val newShouldQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) newBoolQueryBuilder.should(newShouldQueryBuilder) } queryBuilder.filter()?.forEach { - val newFilterQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) + val newFilterQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) newBoolQueryBuilder.filter(newFilterQueryBuilder) } newBoolQueryBuilder.minimumShouldMatch(queryBuilder.minimumShouldMatch()) @@ -336,22 +342,24 @@ fun Rollup.rewriteQueryBuilder(queryBuilder: QueryBuilder, fieldNameMappingTypeM newBoolQueryBuilder.boost(queryBuilder.boost()) } is BoostingQueryBuilder -> { - val newPositiveQueryBuilder = this.rewriteQueryBuilder(queryBuilder.positiveQuery(), fieldNameMappingTypeMap) - val newNegativeQueryBuilder = this.rewriteQueryBuilder(queryBuilder.negativeQuery(), fieldNameMappingTypeMap) + val newPositiveQueryBuilder = this.rewriteQueryBuilder(queryBuilder.positiveQuery(), fieldNameMappingTypeMap, concreteIndexName) + val newNegativeQueryBuilder = this.rewriteQueryBuilder(queryBuilder.negativeQuery(), fieldNameMappingTypeMap, concreteIndexName) val newBoostingQueryBuilder = BoostingQueryBuilder(newPositiveQueryBuilder, newNegativeQueryBuilder) if (queryBuilder.negativeBoost() >= 0) newBoostingQueryBuilder.negativeBoost(queryBuilder.negativeBoost()) newBoostingQueryBuilder.queryName(queryBuilder.queryName()) newBoostingQueryBuilder.boost(queryBuilder.boost()) } is ConstantScoreQueryBuilder -> { - val newInnerQueryBuilder = this.rewriteQueryBuilder(queryBuilder.innerQuery(), fieldNameMappingTypeMap) + val newInnerQueryBuilder = this.rewriteQueryBuilder(queryBuilder.innerQuery(), fieldNameMappingTypeMap, concreteIndexName) val newConstantScoreQueryBuilder = ConstantScoreQueryBuilder(newInnerQueryBuilder) newConstantScoreQueryBuilder.boost(queryBuilder.boost()) newConstantScoreQueryBuilder.queryName(queryBuilder.queryName()) } is DisMaxQueryBuilder -> { val newDisMaxQueryBuilder = DisMaxQueryBuilder() - queryBuilder.innerQueries().forEach { newDisMaxQueryBuilder.add(this.rewriteQueryBuilder(it, fieldNameMappingTypeMap)) } + queryBuilder.innerQueries().forEach { + newDisMaxQueryBuilder.add(this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName)) + } newDisMaxQueryBuilder.tieBreaker(queryBuilder.tieBreaker()) newDisMaxQueryBuilder.queryName(queryBuilder.queryName()) newDisMaxQueryBuilder.boost(queryBuilder.boost()) @@ -362,14 +370,17 @@ fun Rollup.rewriteQueryBuilder(queryBuilder: QueryBuilder, fieldNameMappingTypeM newMatchPhraseQueryBuilder.queryName(queryBuilder.queryName()) newMatchPhraseQueryBuilder.boost(queryBuilder.boost()) } + is QueryStringQueryBuilder -> { + QueryStringQueryUtil.rewriteQueryStringQuery(queryBuilder, concreteIndexName) + } // We do nothing otherwise, the validation logic should have already verified so not throwing an exception else -> queryBuilder } } -fun Set.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder): QueryBuilder { +fun Set.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder, targetIndexName: String = ""): QueryBuilder { val wrappedQueryBuilder = BoolQueryBuilder() - wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap)) + wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap, targetIndexName)) wrappedQueryBuilder.should(TermsQueryBuilder("rollup._id", this.map { it.id })) wrappedQueryBuilder.minimumShouldMatch(1) return wrappedQueryBuilder @@ -391,7 +402,11 @@ fun Rollup.populateFieldMappings(): Set { // TODO: Not a fan of this.. but I can't find a way to overwrite the aggregations on the shallow copy or original // so we need to instantiate a new one so we can add the rewritten aggregation builders @Suppress("ComplexMethod") -fun SearchSourceBuilder.rewriteSearchSourceBuilder(jobs: Set, fieldNameMappingTypeMap: Map): SearchSourceBuilder { +fun SearchSourceBuilder.rewriteSearchSourceBuilder( + jobs: Set, + fieldNameMappingTypeMap: Map, + concreteIndexName: String +): SearchSourceBuilder { val ssb = SearchSourceBuilder() // can use first() here as all jobs in the set will have a superset of the query's terms this.aggregations()?.aggregatorFactories?.forEach { ssb.aggregation(jobs.first().rewriteAggregationBuilder(it)) } @@ -406,7 +421,7 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder(jobs: Set, fieldNameM if (this.minScore() != null) ssb.minScore(this.minScore()) if (this.postFilter() != null) ssb.postFilter(this.postFilter()) ssb.profile(this.profile()) - if (this.query() != null) ssb.query(jobs.buildRollupQuery(fieldNameMappingTypeMap, this.query())) + if (this.query() != null) ssb.query(jobs.buildRollupQuery(fieldNameMappingTypeMap, this.query(), concreteIndexName)) this.rescores()?.forEach { ssb.addRescorer(it) } this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) } if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter()) @@ -425,8 +440,12 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder(jobs: Set, fieldNameM return ssb } -fun SearchSourceBuilder.rewriteSearchSourceBuilder(job: Rollup, fieldNameMappingTypeMap: Map): SearchSourceBuilder { - return this.rewriteSearchSourceBuilder(setOf(job), fieldNameMappingTypeMap) +fun SearchSourceBuilder.rewriteSearchSourceBuilder( + job: Rollup, + fieldNameMappingTypeMap: Map, + concreteIndexName: String +): SearchSourceBuilder { + return this.rewriteSearchSourceBuilder(setOf(job), fieldNameMappingTypeMap, concreteIndexName) } fun Rollup.getInitialDocValues(docCount: Long): MutableMap = diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt index 31a7ea367..3e6288cc7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt @@ -23,7 +23,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin import java.nio.ByteBuffer import java.util.Base64 -@Suppress("UtilityClassWithPublicConstructor") +@Suppress("UtilityClassWithPublicConstructor", "TooManyFunctions") class IndexUtils { companion object { @Suppress("ObjectPropertyNaming") @@ -193,5 +193,65 @@ class IndexUtils { val byteArray = ByteBuffer.allocate(BYTE_ARRAY_SIZE).putLong(hash.h1).putLong(hash.h2).array() return Base64.getUrlEncoder().withoutPadding().encodeToString(byteArray) } + + fun isDataStream(name: String?, clusterState: ClusterState): Boolean { + return clusterState.metadata.dataStreams().containsKey(name) + } + + fun isAlias(indexName: String?, clusterState: ClusterState): Boolean { + return clusterState.metadata.hasAlias(indexName) + } + + fun getWriteIndex(indexName: String?, clusterState: ClusterState): String? { + if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) { + val writeIndexMetadata = clusterState.metadata + .indicesLookup[indexName]!!.writeIndex + if (writeIndexMetadata != null) { + return writeIndexMetadata.index.name + } + } + return null + } + + fun getNewestIndexByCreationDate(concreteIndices: Array, clusterState: ClusterState): String { + val lookup = clusterState.metadata.indicesLookup + var maxCreationDate = Long.MIN_VALUE + var newestIndex: String = concreteIndices[0] + for (indexName in concreteIndices) { + val index = lookup[indexName] + val indexMetadata = clusterState.metadata.index(indexName) + if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) { + if (indexMetadata.creationDate > maxCreationDate) { + maxCreationDate = indexMetadata.creationDate + newestIndex = indexName + } + } + } + return newestIndex + } + + fun isConcreteIndex(indexName: String?, clusterState: ClusterState): Boolean { + return clusterState.metadata + .indicesLookup[indexName]!!.type == IndexAbstraction.Type.CONCRETE_INDEX + } + + fun getConcreteIndex(indexName: String, concreteIndices: Array, clusterState: ClusterState): String { + + if (concreteIndices.isEmpty()) { + throw IllegalArgumentException("ConcreteIndices list can't be empty!") + } + + var concreteIndexName: String + if (concreteIndices.size == 1 && isConcreteIndex(indexName, clusterState)) { + concreteIndexName = indexName + } else if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) { + concreteIndexName = getWriteIndex(indexName, clusterState) + ?: getNewestIndexByCreationDate(concreteIndices, clusterState) // + } else { + concreteIndexName = getNewestIndexByCreationDate(concreteIndices, clusterState) + } + + return concreteIndexName + } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index f680a3a17..9c33dc8bc 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -7,11 +7,14 @@ package org.opensearch.indexmanagement.rollup import org.apache.http.HttpEntity import org.apache.http.HttpHeaders +import org.apache.http.HttpStatus import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader +import org.junit.After import org.junit.AfterClass import org.junit.Before +import org.opensearch.client.Request import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient @@ -50,6 +53,45 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { } } + @After + @Suppress("UNCHECKED_CAST") + fun KillAllCancallableRunningTasks() { + client().makeRequest("POST", "_tasks/_cancel?actions=*") + waitFor { + val response = client().makeRequest("GET", "_tasks") + val nodes = response.asMap()["nodes"] as Map + var hasCancallableRunningTasks = false + nodes.forEach { + val tasks = (it.value as Map)["tasks"] as Map + tasks.forEach { e -> + if ((e.value as Map)["cancellable"] as Boolean) { + hasCancallableRunningTasks = true + } + } + } + assertFalse(hasCancallableRunningTasks) + } + } + + @Suppress("UNCHECKED_CAST") + fun waitForCancallableTasksToFinish() { + waitFor { + val response = client().makeRequest("GET", "_tasks") + val nodes = response.asMap()["nodes"] as Map + var hasCancallableRunningTasks = false + nodes.forEach { + val tasks = (it.value as Map)["tasks"] as Map + tasks.forEach { e -> + if ((e.value as Map)["cancellable"] as Boolean) { + hasCancallableRunningTasks = true + logger.info("cancellable task running: ${e.key}") + } + } + } + assertFalse(hasCancallableRunningTasks) + } + } + @Before fun setDebugLogLevel() { client().makeRequest( @@ -278,4 +320,96 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { ) assertEquals("Request failed", RestStatus.OK, res.restStatus()) } + + protected fun createSampleIndexForQSQTest(index: String) { + val mapping = """ + "properties": { + "event_ts": { + "type": "date" + }, + "test": { + "properties": { + "fff": { + "type": "keyword" + }, + "vvv": { + "type": "keyword" + } + } + }, + "state": { + "type": "keyword" + }, + "state_ext": { + "type": "keyword" + }, + "state_ext2": { + "type": "keyword" + }, + "state_ordinal": { + "type": "long" + }, + "abc test": { + "type": "long" + }, + "earnings": { + "type": "long" + } + + } + """.trimIndent() + createIndex(index, Settings.EMPTY, mapping) + + for (i in 1..5) { + val doc = """ + { + "event_ts": "2019-01-01T12:10:30Z", + "test.fff": "12345", + "test.vvv": "54321", + "state": "TX", + "state_ext": "CA", + "state_ext2": "TX", + "abc test": 123, + "state_ordinal": ${i % 3}, + "earnings": $i + } + """.trimIndent() + indexDoc(index, "$i", doc) + } + for (i in 6..8) { + val doc = """ + { + "event_ts": "2019-01-01T12:10:30Z", + "state": "TA", + "state_ext": "SE", + "state_ext2": "CA", + "state_ordinal": ${i % 3}, + "abc test": 123, + "earnings": $i + } + """.trimIndent() + indexDoc(index, "$i", doc) + } + for (i in 9..11) { + val doc = """ + { + "event_ts": "2019-01-02T12:10:30Z", + "state": "CA", + "state_ext": "MA", + "state_ext2": "CA", + "state_ordinal": ${i % 3}, + "abc test": 123, + "earnings": $i + } + """.trimIndent() + indexDoc(index, "$i", doc) + } + } + + protected fun indexDoc(index: String, id: String, doc: String) { + val request = Request("POST", "$index/_doc/$id/?refresh=true") + request.setJsonEntity(doc) + val resp = client().performRequest(request) + assertEquals(HttpStatus.SC_CREATED, resp.restStatus().status) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index ff13584ff..316291db8 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1077,4 +1077,637 @@ class RollupInterceptorIT : RollupRestTestCase() { assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) } } + + fun `test roll up search query_string query`() { + val sourceIndex = "source_rollup_search_qsq_1" + val targetIndex = "target_rollup_qsq_search_1" + + createSampleIndexForQSQTest(sourceIndex) + + val rollup = Rollup( + id = "basic_query_string_query_rollup_search111", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex, + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), + Terms("state", "state"), + Terms("state_ext", "state_ext"), + Terms("state_ext2", "state_ext2"), + Terms("state_ordinal", "state_ordinal"), + Terms("abc test", "abc test"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "earnings", targetField = "earnings", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 0", + "default_field": "state_ordinal" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + var rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + var rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + var rawAggRes = rawRes.asMap()["aggregations"] as Map> + var rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // Fuzzy query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX~2" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + // Prefix query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:T*" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + // Regex query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:/[A-Z]T/" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + // Range query + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state_ordinal:[0 TO 10]" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + // Query with field prefix + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "123", + "default_field":"abc*" + } + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // Using ALL_MATCH_PATTERN for default_field but rollup job didn't include all fields + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345", + "default_field": "*" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue( + e.message?.contains( + "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + ) ?: false + ) + } + + // Using ALL_MATCH_PATTERN in one of fields in "fields" array but rollup job didn't include all fields + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345", + "fields": ["state", "*"] + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue( + e.message?.contains( + "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + ) ?: false + ) + } + + // field from "fields" list is missing in rollup + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345", + "fields": ["test.fff"] + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue(e.message?.contains("[missing field test.fff]") ?: false) + } + + // no fields or default_field present. Fallback on index setting [index.query.default_field] default value: "*" + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 12345" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + } catch (e: ResponseException) { + assertTrue( + e.message?.contains( + "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + ) ?: false + ) + } + + // fallback on index settings index.query.default_field:state_ordinal + client().makeRequest( + "PUT", "$sourceIndex/_settings", + StringEntity( + """ + { + "index": { + "query": { + "default_field":"state_ordinal" + } + } + } + """.trimIndent(), + ContentType.APPLICATION_JSON + ) + ) + // + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 7" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // prefix pattern in "default_field" field + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "TX AND CA", + "default_field": "state_e*" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // field with space in query: + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "abc\\ test:123" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + + // _exists_:field + req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "_exists_:abc\\ test" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + rawAggRes = rawRes.asMap()["aggregations"] as Map> + rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + } + + fun `test roll up search query_string query invalid query`() { + val sourceIndex = "source_rollup_search_qsq_2" + val targetIndex = "target_rollup_qsq_search_2" + generateNYCTaxiData(sourceIndex) + val rollup = Rollup( + id = "basic_query_string_query_rollup_search_2", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex, + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + // Invalid query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "::!invalid+-+-::query:::" + } + }, + "aggs": { + "min_passenger_count": { + "sum": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + fail("search should've failed due to incorrect query") + } catch (e: ResponseException) { + assertTrue("The query_string query wasn't invalid", e.message!!.contains("Failed to parse query")) + } + } + + fun `test roll up search query_string query unknown field`() { + val sourceIndex = "source_rollup_search_qsq" + val targetIndex = "target_rollup_qsq_search" + generateNYCTaxiData(sourceIndex) + val rollup = Rollup( + id = "basic_query_string_query_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex, + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "RatecodeID:>=1 AND unknown_field:<=10" + } + }, + "aggs": { + "min_passenger_count": { + "sum": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + fail("search should've failed due to incorrect query") + } catch (e: ResponseException) { + assertTrue("The query_string query field check failed!", e.message!!.contains("Could not find a rollup job that can answer this query because [missing field unknown_field]")) + } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 72727fffe..30eb73d80 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -204,7 +204,7 @@ class RollupRunnerIT : RollupRestTestCase() { sourceIndex = indexName, targetIndex = "${indexName}_target", metadataID = null, - continuous = true + continuous = false ) // Create source index @@ -222,24 +222,31 @@ class RollupRunnerIT : RollupRestTestCase() { updateRollupStartTime(rollup) var previousRollupMetadata: RollupMetadata? = null - waitFor { + rollup = waitFor { val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job not found", rollupJob) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + assertFalse("Rollup job is still enabled", rollupJob.enabled) previousRollupMetadata = getRollupMetadata(rollupJob.metadataID!!) assertNotNull("Rollup metadata not found", previousRollupMetadata) - assertEquals("Unexpected metadata status", RollupMetadata.Status.INIT, previousRollupMetadata!!.status) + assertEquals("Unexpected metadata status", RollupMetadata.Status.FINISHED, previousRollupMetadata!!.status) + rollupJob } - // Delete rollup metadata assertNotNull("Previous rollup metadata was not saved", previousRollupMetadata) deleteRollupMetadata(previousRollupMetadata!!.id) - // Update rollup start time to run second execution + // Enable rollup and Update start time to run second execution + client().makeRequest( + "PUT", + "$ROLLUP_JOBS_BASE_URI/${rollup.id}?if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}", + emptyMap(), rollup.copy(enabled = true, jobEnabledTime = Instant.now()).toHttpEntity() + ) + updateRollupStartTime(rollup) - waitFor() { + waitFor { val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job not found", rollupJob) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) @@ -861,16 +868,13 @@ class RollupRunnerIT : RollupRestTestCase() { assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + assertTrue("Rollup is not disabled", !rollupJob.enabled) rollupJob } var rollupMetadataID = startedRollup.metadataID!! var rollupMetadata = getRollupMetadata(rollupMetadataID) assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) - // TODO Flaky: version conflict could happen here - // From log diving, it seems to be a race condition coming from RollupRunner - // (need more dive to understand rollup business logic) - // There are indexRollup happened between get and enable // restart job client().makeRequest( "PUT",