Skip to content

Commit

Permalink
[backport to 1.3] Support writing features using filter aggregation (#…
Browse files Browse the repository at this point in the history
…425) (#560)

Previously, when writing a feature using filter aggregation, we failed with EndRunException: Failed to parse aggregation.. due to missing logic parsing the result.

This PR adds the parsing logic.

Testing done:
1. added unit tests
2. end-to-end testing.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Jun 1, 2022
1 parent 264ecc9 commit b62de79
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 1 deletion.
45 changes: 44 additions & 1 deletion src/main/java/org/opensearch/ad/feature/AbstractRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalTDigestPercentiles;
Expand All @@ -30,8 +32,49 @@
import org.opensearch.search.builder.SearchSourceBuilder;

public abstract class AbstractRetriever {
protected double parseAggregation(Aggregation aggregation) {
protected double parseAggregation(Aggregation aggregationToParse) {
Double result = null;
/* example InternalSingleBucketAggregation: filter aggregation like
"t_shirts": {
"filter": {
"bool": {
"should": [
{
"term": {
"issueType": "foo"
}
}
...
],
"minimum_should_match": "1",
"boost": 1
}
},
"aggs": {
"impactUniqueAccounts": {
"aggregation": {
"field": "account"
}
}
}
}
would produce an InternalFilter (a subtype of InternalSingleBucketAggregation) with a sub-aggregation
InternalCardinality that is also a SingleValue
*/

if (aggregationToParse instanceof InternalSingleBucketAggregation) {
InternalAggregations bucket = ((InternalSingleBucketAggregation) aggregationToParse).getAggregations();
if (bucket != null) {
List<Aggregation> aggrs = bucket.asList();
if (aggrs.size() == 1) {
// we only accept a single value as feature
aggregationToParse = aggrs.get(0);
}
}
}

final Aggregation aggregation = aggregationToParse;
if (aggregation instanceof SingleValue) {
result = ((SingleValue) aggregation).value();
} else if (aggregation instanceof InternalTDigestPercentiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -63,11 +65,15 @@
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand All @@ -78,13 +84,21 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.opensearch.search.aggregations.bucket.filter.InternalFilter;
import org.opensearch.search.aggregations.bucket.filter.InternalFilters;
import org.opensearch.search.aggregations.bucket.filter.InternalFilters.InternalBucket;
import org.opensearch.search.aggregations.bucket.range.InternalDateRange;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.metrics.AbstractHyperLogLog;
import org.opensearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus;
import org.opensearch.search.aggregations.metrics.HyperLogLogPlusPlus;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.aggregations.metrics.InternalMax;
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder;
import org.opensearch.search.internal.InternalSearchResponse;

import com.carrotsearch.hppc.BitMixer;
import com.google.common.collect.ImmutableList;

/**
Expand Down Expand Up @@ -571,4 +585,50 @@ public void testGetColdStartSamplesForPeriodsDefaultFormat() throws IOException,
public void testGetColdStartSamplesForPeriodsRawFormat() throws IOException, InterruptedException {
getColdStartSamplesForPeriodsTemplate(DocValueFormat.RAW);
}

@SuppressWarnings("rawtypes")
public void testParseBuckets() throws InstantiationException,
IllegalAccessException,
IllegalArgumentException,
InvocationTargetException,
NoSuchMethodException,
SecurityException {
// cannot mock final class HyperLogLogPlusPlus
HyperLogLogPlusPlus hllpp = new HyperLogLogPlusPlus(
randomIntBetween(AbstractHyperLogLog.MIN_PRECISION, AbstractHyperLogLog.MAX_PRECISION),
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
1
);
hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100)));
hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100)));

Constructor ctor = null;
ctor = InternalCardinality.class.getDeclaredConstructor(String.class, AbstractHyperLogLogPlusPlus.class, Map.class);
ctor.setAccessible(true);
InternalCardinality cardinality = (InternalCardinality) ctor.newInstance("impactUniqueAccounts", hllpp, new HashMap<>());

// have to use reflection as all of InternalFilter's constructor are not public
ctor = InternalFilter.class.getDeclaredConstructor(String.class, long.class, InternalAggregations.class, Map.class);

ctor.setAccessible(true);
String featureId = "deny_max";
InternalFilter internalFilter = (InternalFilter) ctor
.newInstance(featureId, 100, InternalAggregations.from(Arrays.asList(cardinality)), new HashMap<>());
InternalBucket bucket = new InternalFilters.InternalBucket(
"test",
randomIntBetween(0, 1000),
InternalAggregations.from(Arrays.asList(internalFilter)),
true
);

Optional<double[]> parsedResult = searchFeatureDao.parseBucket(bucket, Arrays.asList(featureId));

assertTrue(parsedResult.isPresent());
double[] parsedCardinality = parsedResult.get();
assertEquals(1, parsedCardinality.length);
assertEquals(2, parsedCardinality[0], 0.001);

// release MockBigArrays; otherwise, test will fail
Releasables.close(hllpp);
}
}

0 comments on commit b62de79

Please sign in to comment.