Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Star Tree] [Search] Support for metric aggregations with/without term query #16416

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387)
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.compositeindex.datacube;

import org.apache.lucene.index.DocValuesType;
import org.opensearch.common.Rounding;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.time.DateUtils;
Expand Down Expand Up @@ -169,4 +170,8 @@
public static List<DateTimeUnitRounding> getSortedDateTimeUnits(List<DateTimeUnitRounding> dateTimeUnits) {
return dateTimeUnits.stream().sorted(new DateTimeUnitComparator()).collect(Collectors.toList());
}

public DocValuesType getDocValuesType() {
return DocValuesType.SORTED_NUMERIC;

Check warning on line 175 in server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java#L175

Added line #L175 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.compositeindex.datacube;

import org.apache.lucene.index.DocValuesType;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.xcontent.ToXContent;

Expand Down Expand Up @@ -42,4 +43,6 @@ public interface Dimension extends ToXContent {
* Returns the list of dimension fields that represent the dimension
*/
List<String> getSubDimensionNames();

DocValuesType getDocValuesType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.compositeindex.datacube;

import org.apache.lucene.index.DocValuesType;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
Expand Down Expand Up @@ -71,4 +72,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(field);
}

@Override
public DocValuesType getDocValuesType() {
return DocValuesType.SORTED_NUMERIC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.compositeindex.datacube;

import org.apache.lucene.index.DocValuesType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;

Expand Down Expand Up @@ -69,4 +70,9 @@
public int hashCode() {
return Objects.hash(field);
}

@Override
public DocValuesType getDocValuesType() {
return DocValuesType.SORTED_NUMERIC;

Check warning on line 76 in server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java#L76

Added line #L76 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.utils;

import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.codec.composite.CompositeIndexReader;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeFilter;
import org.opensearch.search.startree.StarTreeQueryContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* Helper class for building star-tree query
*
* @opensearch.internal
* @opensearch.experimental
*/
public class StarTreeQueryHelper {

/**
* Checks if the search context can be supported by star-tree
*/
public static boolean isStarTreeSupported(SearchContext context) {
return context.aggregations() != null && context.mapperService().isCompositeIndexPresent() && context.parsedPostFilter() == null;
}

/**
* Gets StarTreeQueryContext from the search context and source builder.
* Returns null if the query and aggregation cannot be supported.
*/
public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context, SearchSourceBuilder source) throws IOException {
// Current implementation assumes only single star-tree is supported
CompositeDataCubeFieldType compositeMappedFieldType = (CompositeDataCubeFieldType) context.mapperService()
.getCompositeFieldTypes()
.iterator()
.next();
CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo(
compositeMappedFieldType.name(),
compositeMappedFieldType.getCompositeIndexType()
);

for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);
if (metricStat == null) {
return null;

Check warning on line 79 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L79

Added line #L79 was not covered by tests
}
}

// need to cache star tree values only for multiple aggregations
boolean cacheStarTreeValues = context.aggregations().factories().getFactories().length > 1;
int cacheSize = cacheStarTreeValues ? context.indexShard().segments(false).size() : -1;

return StarTreeQueryHelper.tryCreateStarTreeQueryContext(starTree, compositeMappedFieldType, source.query(), cacheSize);
}

/**
* Uses query builder and composite index info to form star-tree query context
*/
private static StarTreeQueryContext tryCreateStarTreeQueryContext(
CompositeIndexFieldInfo compositeIndexFieldInfo,
CompositeDataCubeFieldType compositeFieldType,
QueryBuilder queryBuilder,
int cacheStarTreeValuesSize
) {
Map<String, Long> queryMap;
if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) {
queryMap = null;
} else if (queryBuilder instanceof TermQueryBuilder) {
// TODO: Add support for keyword fields
if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) {
// return null for non-numeric fields
return null;

Check warning on line 106 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L106

Added line #L106 was not covered by tests
}

List<String> supportedDimensions = compositeFieldType.getDimensions()
.stream()
.map(Dimension::getField)
.collect(Collectors.toList());
queryMap = getStarTreePredicates(queryBuilder, supportedDimensions);
if (queryMap == null) {
return null;

Check warning on line 115 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L115

Added line #L115 was not covered by tests
}
} else {
return null;

Check warning on line 118 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L118

Added line #L118 was not covered by tests
}
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize);
}

/**
* Parse query body to star-tree predicates
* @param queryBuilder to match star-tree supported query shape
* @return predicates to match
*/
private static Map<String, Long> getStarTreePredicates(QueryBuilder queryBuilder, List<String> supportedDimensions) {
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
String field = tq.fieldName();
if (!supportedDimensions.contains(field)) {
return null;

Check warning on line 132 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L132

Added line #L132 was not covered by tests
}
long inputQueryVal = Long.parseLong(tq.value().toString());

// Create a map with the field and the value
Map<String, Long> predicateMap = new HashMap<>();
predicateMap.put(field, inputQueryVal);
return predicateMap;
}

private static MetricStat validateStarTreeMetricSupport(
CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) {
String field;
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
.stream()
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));

MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat();
field = ((MetricAggregatorFactory) aggregatorFactory).getField();

if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
return metricStat;
}
}
return null;

Check warning on line 159 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L159

Added line #L159 was not covered by tests
}

public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
StarTreeQueryContext starTreeQueryContext = context.getStarTreeQueryContext();
return (starTreeQueryContext != null) ? starTreeQueryContext.getStarTree() : null;
}

public static StarTreeValues getStarTreeValues(LeafReaderContext context, CompositeIndexFieldInfo starTree) throws IOException {
SegmentReader reader = Lucene.segmentReader(context.reader());
if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) {
return null;

Check warning on line 170 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L170

Added line #L170 was not covered by tests
}
CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader();
return (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree);
}

/**
* Get the star-tree leaf collector
* This collector computes the aggregation prematurely and invokes an early termination collector
*/
public static LeafBucketCollector getStarTreeLeafCollector(
SearchContext context,
ValuesSource.Numeric valuesSource,
LeafReaderContext ctx,
LeafBucketCollector sub,
CompositeIndexFieldInfo starTree,
String metric,
Consumer<Long> valueConsumer,
Runnable finalConsumer
) throws IOException {
StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree);
assert starTreeValues != null;
String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName();
String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, metric);

assert starTreeValues != null;
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator(
metricName
);
// Obtain a FixedBitSet of matched star tree document IDs
FixedBitSet filteredValues = getStarTreeFilteredValues(context, ctx, starTreeValues);
assert filteredValues != null;

int numBits = filteredValues.length(); // Get the number of the filtered values (matching docs)
if (numBits > 0) {
// Iterate over the filtered values
for (int bit = filteredValues.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? filteredValues.nextSetBit(bit + 1)

Check warning on line 207 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java#L207

Added line #L207 was not covered by tests
: DocIdSetIterator.NO_MORE_DOCS) {
// Advance to the entryId in the valuesIterator
if (valuesIterator.advanceExact(bit) == false) {
continue; // Skip if no more entries
}

// Iterate over the values for the current entryId
for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {
long value = valuesIterator.nextValue();
valueConsumer.accept(value); // Apply the consumer operation (e.g., max, sum)
}
}
}

// Call the final consumer after processing all entries
finalConsumer.run();

// Return a LeafBucketCollector that terminates collection
return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) {
@Override
public void collect(int doc, long bucket) {
throw new CollectionTerminatedException();
}
};
}

/**
* Get the filtered values for the star-tree query
* Cache the results in case of multiple aggregations (if cache is initialized)
* @return FixedBitSet of matched document IDs
*/
public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafReaderContext ctx, StarTreeValues starTreeValues)
throws IOException {
FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx);
if (result == null) {
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
context.getStarTreeQueryContext().setStarTreeValues(ctx, result);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ public SortedNumericStarTreeValuesIterator(DocIdSetIterator docIdSetIterator) {
public long nextValue() throws IOException {
return ((SortedNumericDocValues) docIdSetIterator).nextValue();
}

public int entryValueCount() throws IOException {
return ((SortedNumericDocValues) docIdSetIterator).docValueCount();
}

public boolean advanceExact(int target) throws IOException {
return ((SortedNumericDocValues) docIdSetIterator).advanceExact(target);
}
}
Loading
Loading