From b62de79bc5b6bfcb73a2db1094e79afa85d93470 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Wed, 1 Jun 2022 16:14:03 -0700 Subject: [PATCH] [backport to 1.3] Support writing features using filter aggregation (#425) (#560) Previously, when writing a feature using filter aggregation, we failed with EndRunException: Failed to parse aggregation.. due to missing logic parsing the result. This PR adds the parsing logic. Testing done: 1. added unit tests 2. end-to-end testing. Signed-off-by: Kaituo Li --- .../ad/feature/AbstractRetriever.java | 45 +++++++++++++- .../NoPowermockSearchFeatureDaoTests.java | 60 +++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/ad/feature/AbstractRetriever.java b/src/main/java/org/opensearch/ad/feature/AbstractRetriever.java index dd9aa88b0..3a9a31d2d 100644 --- a/src/main/java/org/opensearch/ad/feature/AbstractRetriever.java +++ b/src/main/java/org/opensearch/ad/feature/AbstractRetriever.java @@ -22,6 +22,8 @@ import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.aggregations.bucket.InternalSingleBucketAggregation; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.opensearch.search.aggregations.metrics.InternalTDigestPercentiles; @@ -30,8 +32,49 @@ import org.opensearch.search.builder.SearchSourceBuilder; public abstract class AbstractRetriever { - protected double parseAggregation(Aggregation aggregation) { + protected double parseAggregation(Aggregation aggregationToParse) { Double result = null; + /* example InternalSingleBucketAggregation: filter aggregation like + "t_shirts": { + "filter": { + "bool": { + "should": [ + { + "term": { + "issueType": "foo" + } + } + ... + ], + "minimum_should_match": "1", + "boost": 1 + } + }, + "aggs": { + "impactUniqueAccounts": { + "aggregation": { + "field": "account" + } + } + } + } + + would produce an InternalFilter (a subtype of InternalSingleBucketAggregation) with a sub-aggregation + InternalCardinality that is also a SingleValue + */ + + if (aggregationToParse instanceof InternalSingleBucketAggregation) { + InternalAggregations bucket = ((InternalSingleBucketAggregation) aggregationToParse).getAggregations(); + if (bucket != null) { + List aggrs = bucket.asList(); + if (aggrs.size() == 1) { + // we only accept a single value as feature + aggregationToParse = aggrs.get(0); + } + } + } + + final Aggregation aggregation = aggregationToParse; if (aggregation instanceof SingleValue) { result = ((SingleValue) aggregation).value(); } else if (aggregation instanceof InternalTDigestPercentiles) { diff --git a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java index cd7a08a9a..5005f41da 100644 --- a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java @@ -19,6 +19,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.ZoneOffset; @@ -63,11 +65,15 @@ import org.opensearch.ad.util.ClientUtil; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.util.MockBigArrays; +import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -78,13 +84,21 @@ import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.opensearch.search.aggregations.bucket.filter.InternalFilter; +import org.opensearch.search.aggregations.bucket.filter.InternalFilters; +import org.opensearch.search.aggregations.bucket.filter.InternalFilters.InternalBucket; import org.opensearch.search.aggregations.bucket.range.InternalDateRange; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.metrics.AbstractHyperLogLog; +import org.opensearch.search.aggregations.metrics.AbstractHyperLogLogPlusPlus; +import org.opensearch.search.aggregations.metrics.HyperLogLogPlusPlus; +import org.opensearch.search.aggregations.metrics.InternalCardinality; import org.opensearch.search.aggregations.metrics.InternalMax; import org.opensearch.search.aggregations.metrics.SumAggregationBuilder; import org.opensearch.search.internal.InternalSearchResponse; +import com.carrotsearch.hppc.BitMixer; import com.google.common.collect.ImmutableList; /** @@ -571,4 +585,50 @@ public void testGetColdStartSamplesForPeriodsDefaultFormat() throws IOException, public void testGetColdStartSamplesForPeriodsRawFormat() throws IOException, InterruptedException { getColdStartSamplesForPeriodsTemplate(DocValueFormat.RAW); } + + @SuppressWarnings("rawtypes") + public void testParseBuckets() throws InstantiationException, + IllegalAccessException, + IllegalArgumentException, + InvocationTargetException, + NoSuchMethodException, + SecurityException { + // cannot mock final class HyperLogLogPlusPlus + HyperLogLogPlusPlus hllpp = new HyperLogLogPlusPlus( + randomIntBetween(AbstractHyperLogLog.MIN_PRECISION, AbstractHyperLogLog.MAX_PRECISION), + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), + 1 + ); + hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100))); + hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100))); + + Constructor ctor = null; + ctor = InternalCardinality.class.getDeclaredConstructor(String.class, AbstractHyperLogLogPlusPlus.class, Map.class); + ctor.setAccessible(true); + InternalCardinality cardinality = (InternalCardinality) ctor.newInstance("impactUniqueAccounts", hllpp, new HashMap<>()); + + // have to use reflection as all of InternalFilter's constructor are not public + ctor = InternalFilter.class.getDeclaredConstructor(String.class, long.class, InternalAggregations.class, Map.class); + + ctor.setAccessible(true); + String featureId = "deny_max"; + InternalFilter internalFilter = (InternalFilter) ctor + .newInstance(featureId, 100, InternalAggregations.from(Arrays.asList(cardinality)), new HashMap<>()); + InternalBucket bucket = new InternalFilters.InternalBucket( + "test", + randomIntBetween(0, 1000), + InternalAggregations.from(Arrays.asList(internalFilter)), + true + ); + + Optional parsedResult = searchFeatureDao.parseBucket(bucket, Arrays.asList(featureId)); + + assertTrue(parsedResult.isPresent()); + double[] parsedCardinality = parsedResult.get(); + assertEquals(1, parsedCardinality.length); + assertEquals(2, parsedCardinality[0], 0.001); + + // release MockBigArrays; otherwise, test will fail + Releasables.close(hllpp); + } }