Skip to content

Commit

Permalink
added handling of _exists_ and prefix fields in default_field
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
Petar Dzepina authored and petardz committed Jan 9, 2023
1 parent b96805c commit 82a1490
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.indexmanagement.rollup.util.getRollupJobs
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.rollup.util.populateFieldMappings
import org.opensearch.indexmanagement.rollup.util.rewriteSearchSourceBuilder
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.search.aggregations.AggregationBuilder
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval
Expand Down Expand Up @@ -86,9 +87,13 @@ class RollupInterceptor(
val indices = request.indices().map { it.toString() }.toTypedArray()
val concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
// To extract fields from QueryStringQueryBuilder we need concrete source index name.
val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0)
?: throw IllegalArgumentException("No rollup job associated with target_index")
val queryFieldMappings = getQueryMetadata(request.source().query(), rollupJob.sourceIndex)
val queryFieldMappings = getQueryMetadata(
request.source().query(),
IndexUtils.getConcreteIndex(rollupJob.sourceIndex, concreteIndices, clusterService.state())
)
val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories)
val fieldMappings = queryFieldMappings + aggregationFieldMappings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
package org.opensearch.indexmanagement.rollup.query

import org.apache.lucene.search.Query
import org.opensearch.common.regex.Regex
import org.opensearch.index.query.QueryShardContext
import org.opensearch.index.search.QueryStringQueryParser

const val EXISTS = "_exists_"

class QueryStringQueryParserExt : QueryStringQueryParser {

val discoveredFields = mutableListOf<String>()
Expand All @@ -19,38 +22,40 @@ class QueryStringQueryParserExt : QueryStringQueryParser {
constructor(context: QueryShardContext, resolvedFields: Map<String, Float>, lenient: Boolean) : super(context, resolvedFields, lenient)

override fun getFuzzyQuery(field: String?, termStr: String?, minSimilarity: Float): Query? {
if (field == null) hasLonelyTerms = true
else if (field != "*") discoveredFields.add(field)
handleFieldQueryDiscovered(field)
return super.getFuzzyQuery(field, termStr, minSimilarity)
}
override fun getPrefixQuery(field: String?, termStr: String?): Query {
if (field == null) hasLonelyTerms = true
else if (field != "*") discoveredFields.add(field)
handleFieldQueryDiscovered(field)
return super.getPrefixQuery(field, termStr)
}
override fun getFieldQuery(field: String?, queryText: String?, quoted: Boolean): Query {
if (field == null) hasLonelyTerms = true
else if (field != "*") discoveredFields.add(field)
handleFieldQueryDiscovered(field, queryText)
return super.getFieldQuery(field, queryText, quoted)
}
override fun getWildcardQuery(field: String?, termStr: String?): Query {
if (field == null) hasLonelyTerms = true
else if (field != "*") discoveredFields.add(field)
handleFieldQueryDiscovered(field)
return super.getWildcardQuery(field, termStr)
}
override fun getFieldQuery(field: String?, queryText: String?, slop: Int): Query {
if (field == null) hasLonelyTerms = true
else if (field != "*") discoveredFields.add(field)
handleFieldQueryDiscovered(field, queryText)
return super.getFieldQuery(field, queryText, slop)
}
override fun getRangeQuery(field: String?, part1: String?, part2: String?, startInclusive: Boolean, endInclusive: Boolean): Query {
if (field == null) hasLonelyTerms = true
else if (field != "*") discoveredFields.add(field)
handleFieldQueryDiscovered(field)
return super.getRangeQuery(field, part1, part2, startInclusive, endInclusive)
}
override fun getRegexpQuery(field: String?, termStr: String?): Query {
if (field == null) hasLonelyTerms = true
else if (field != "*") discoveredFields.add(field)
handleFieldQueryDiscovered(field)
return super.getRegexpQuery(field, termStr)
}

private fun handleFieldQueryDiscovered(field: String?, queryText: String? = null) {
if (field == null || Regex.isSimpleMatchPattern(field)) {
hasLonelyTerms = true
} else {
if (field == EXISTS && queryText?.isNotEmpty() == true) discoveredFields.add(queryText)
else discoveredFields.add(field)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ object QueryStringQueryUtil {
// Rewrite query_string
var newQueryString = qsqBuilder.queryString()
fieldsFromQueryString.forEach { field ->
newQueryString = newQueryString.replace("$field:", "$field.${Dimension.Type.TERMS.type}:")
val escapedField = escapeSpaceCharacters(field)
newQueryString = newQueryString.replace("$escapedField:", "$escapedField.${Dimension.Type.TERMS.type}:")
newQueryString = newQueryString.replace("$EXISTS:$escapedField", "$EXISTS:$escapedField.${Dimension.Type.TERMS.type}")
}

// We will rewrite here only concrete default fields.
// Prefix ones we will resolve(otherFields) and insert into fields array
var newDefaultField = qsqBuilder.defaultField()
if (newDefaultField != null && newDefaultField != "*") {
if (newDefaultField != null && Regex.isSimpleMatchPattern(newDefaultField) == false) {
newDefaultField = newDefaultField + ".${Dimension.Type.TERMS.type}"
} else {
newDefaultField = null
Expand Down Expand Up @@ -82,6 +86,12 @@ object QueryStringQueryUtil {
}
return retVal
}

private fun escapeSpaceCharacters(field: String): String {
val escapedField = field.replace(" ", "\\ ")
return escapedField
}

@Suppress("ComplexMethod", "LongMethod", "ThrowsCount", "EmptyCatchBlock")
fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): Pair<List<String>, Map<String, Float>> {
val context = QueryShardContextFactory.createShardContext(concreteIndexName)
Expand All @@ -92,8 +102,11 @@ object QueryStringQueryUtil {
var otherFields = mapOf<String, Float>()
if (qsqBuilder.defaultField() != null) {
if (Regex.isMatchAllPattern(qsqBuilder.defaultField())) {
otherFields = resolveMatchAllPatternFields(context)
otherFields = resolveMatchPatternFields(context)
queryParser = QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient())
} else if (Regex.isSimpleMatchPattern(qsqBuilder.defaultField())) {
otherFields = resolveMatchPatternFields(context, qsqBuilder.defaultField())
queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient)
} else {
queryParser = QueryStringQueryParserExt(context, qsqBuilder.defaultField(), isLenient)
}
Expand All @@ -108,7 +121,7 @@ object QueryStringQueryUtil {
} else {
val defaultFields: List<String> = context.defaultFields()
queryParser = if (QueryParserHelper.hasAllFieldsWildcard(defaultFields)) {
otherFields = resolveMatchAllPatternFields(context)
otherFields = resolveMatchPatternFields(context)
QueryStringQueryParserExt(context, if (qsqBuilder.lenient() == null) true else qsqBuilder.lenient())
} else {
val resolvedFields = QueryParserHelper.resolveMappingFields(
Expand Down Expand Up @@ -170,10 +183,11 @@ object QueryStringQueryUtil {
}

@Suppress("EmptyCatchBlock", "LoopWithTooManyJumpStatements")
fun resolveMatchAllPatternFields(
fun resolveMatchPatternFields(
context: QueryShardContext,
pattern: String = "*"
): Map<String, Float> {
val allFields = context.simpleMatchToIndexNames("*")
val allFields = context.simpleMatchToIndexNames(pattern)
val fields: MutableMap<String, Float> = HashMap()
for (fieldName in allFields) {
val fieldType = context.mapperService.fieldType(fieldName) ?: continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import org.opensearch.plugins.PluginsService
import org.opensearch.script.ScriptService
import java.time.Instant

/**
* Creates QueryShardContext object which is used in QueryStringQuery rewrite.
* We need this because we have to use QueryStringQueryParser class which requires QueryShardContext as parameter
*/
object QueryShardContextFactory {
lateinit var client: Client
lateinit var clusterService: ClusterService
Expand All @@ -54,17 +58,13 @@ object QueryShardContextFactory {
this.environment = environment
}

fun getIndexSettingsAndMetadata(indexName: String?): Triple<Index?, Settings?, IndexMetadata?> {
var index: Index?
var indexSettings: Settings?
private fun getIndexSettingsAndMetadata(indexName: String?): Triple<Index?, Settings?, IndexMetadata?> {
val index: Index?
val indexSettings: Settings?
val indexMetadata = clusterService.state().metadata.index(indexName)
if (indexMetadata != null) {
index = indexMetadata.index
indexSettings = indexMetadata.settings
} else {
index = Index("dummyIndexName", "randomindexuuid123456")
indexSettings = Settings.EMPTY
}
?: throw IllegalArgumentException("Can't find IndexMetadata for index: [$indexName]")
index = indexMetadata.index
indexSettings = indexMetadata.settings
return Triple(index, indexSettings, indexMetadata)
}

Expand All @@ -82,7 +82,7 @@ object QueryShardContextFactory {
additionalSettings,
pluginsService.pluginSettingsFilter, emptySet()
)
val indexScopedSettings: IndexScopedSettings = settingsModule.getIndexScopedSettings()
val indexScopedSettings: IndexScopedSettings = settingsModule.indexScopedSettings
val idxSettings = newIndexSettings(index, indexSettings, indexScopedSettings)
val indicesModule = IndicesModule(pluginsService.filterPlugins(MapperPlugin::class.java))
val mapperRegistry = indicesModule.mapperRegistry
Expand Down Expand Up @@ -123,7 +123,7 @@ object QueryShardContextFactory {
)
}

fun newIndexSettings(index: Index?, settings: Settings?, indexScopedSettings: IndexScopedSettings?): IndexSettings? {
private fun newIndexSettings(index: Index?, settings: Settings?, indexScopedSettings: IndexScopedSettings?): IndexSettings? {
val build = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
Expand Down
60 changes: 60 additions & 0 deletions src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,65 @@ class IndexUtils {
val byteArray = ByteBuffer.allocate(BYTE_ARRAY_SIZE).putLong(hash.h1).putLong(hash.h2).array()
return Base64.getUrlEncoder().withoutPadding().encodeToString(byteArray)
}

fun isDataStream(name: String?, clusterState: ClusterState): Boolean {
return clusterState.metadata.dataStreams().containsKey(name)
}

fun isAlias(indexName: String?, clusterState: ClusterState): Boolean {
return clusterState.metadata.hasAlias(indexName)
}

fun getWriteIndex(indexName: String?, clusterState: ClusterState): String? {
if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) {
val writeIndexMetadata = clusterState.metadata
.indicesLookup[indexName]!!.writeIndex
if (writeIndexMetadata != null) {
return writeIndexMetadata.index.name
}
}
return null
}

fun getNewestIndexByCreationDate(concreteIndices: Array<String>, clusterState: ClusterState): String {
val lookup = clusterState.metadata.indicesLookup
var maxCreationDate = Long.MIN_VALUE
var newestIndex: String = concreteIndices[0]
for (indexName in concreteIndices) {
val index = lookup[indexName]
val indexMetadata = clusterState.metadata.index(indexName)
if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
if (indexMetadata.creationDate > maxCreationDate) {
maxCreationDate = indexMetadata.creationDate
newestIndex = indexName
}
}
}
return newestIndex
}

fun isConcreteIndex(indexName: String?, clusterState: ClusterState): Boolean {
return clusterState.metadata
.indicesLookup[indexName]!!.type == IndexAbstraction.Type.CONCRETE_INDEX
}

fun getConcreteIndex(indexName: String, concreteIndices: Array<String>, clusterState: ClusterState): String {

if (concreteIndices.isEmpty()) {
throw IllegalArgumentException("ConcreteIndices list can't be empty!")
}

var concreteIndexName: String
if (concreteIndices.size == 1 && isConcreteIndex(indexName, clusterState)) {
concreteIndexName = indexName
} else if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) {
concreteIndexName = getWriteIndex(indexName, clusterState)
?: getNewestIndexByCreationDate(concreteIndices, clusterState) //
} else {
concreteIndexName = getNewestIndexByCreationDate(concreteIndices, clusterState)
}

return concreteIndexName
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,15 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
"state_ext": {
"type": "keyword"
},
"state_ext2": {
"type": "keyword"
},
"state_ordinal": {
"type": "long"
},
"abc test": {
"type": "long"
},
"earnings": {
"type": "long"
}
Expand All @@ -362,6 +368,8 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
"test.vvv": "54321",
"state": "TX",
"state_ext": "CA",
"state_ext2": "TX",
"abc test": 123,
"state_ordinal": ${i % 3},
"earnings": $i
}
Expand All @@ -374,7 +382,9 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
"event_ts": "2019-01-01T12:10:30Z",
"state": "TA",
"state_ext": "SE",
"state_ordinal": ${i % 3},
"state_ext2": "CA",
"state_ordinal": ${i % 3},
"abc test": 123,
"earnings": $i
}
""".trimIndent()
Expand All @@ -386,7 +396,9 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
"event_ts": "2019-01-02T12:10:30Z",
"state": "CA",
"state_ext": "MA",
"state_ordinal": ${i % 3},
"state_ext2": "CA",
"state_ordinal": ${i % 3},
"abc test": 123,
"earnings": $i
}
""".trimIndent()
Expand Down
Loading

0 comments on commit 82a1490

Please sign in to comment.