diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index 1a40f4bea425d..44fa62fef3fa0 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -217,6 +217,9 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurr } InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata()); + Integer size = reduceContext.builder() instanceof TimeSeriesAggregationBuilder + ? ((TimeSeriesAggregationBuilder) reduceContext.builder()).getSize() + : null; // tests may use a fake builder List bucketsWithSameKey = new ArrayList<>(aggregations.size()); BytesRef prevTsid = null; while (pq.size() > 0) { @@ -247,6 +250,9 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurr BytesRef tsid = reducedBucket.key; assert prevTsid == null || tsid.compareTo(prevTsid) > 0; reduced.buckets.add(reducedBucket); + if (size != null && reduced.buckets.size() >= size) { + break; + } prevTsid = tsid; } return reduced; diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java index 10753ebd45e8a..405a382373752 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java @@ -15,6 +15,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.xcontent.InstantiatingObjectParser; import org.elasticsearch.xcontent.ParseField; @@ -30,9 +31,13 @@ public class TimeSeriesAggregationBuilder extends AbstractAggregationBuilder { public static final String NAME = "time_series"; public static final ParseField KEYED_FIELD = new ParseField("keyed"); + public static final ParseField SIZE_FIELD = new ParseField("size"); public static final InstantiatingObjectParser PARSER; private boolean keyed; + private int size; + + private static final int DEFAULT_SIZE = MultiBucketConsumerService.DEFAULT_MAX_BUCKETS; static { InstantiatingObjectParser.Builder parser = InstantiatingObjectParser.builder( @@ -41,17 +46,23 @@ public class TimeSeriesAggregationBuilder extends AbstractAggregationBuilder metadata) throws IOException { - return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata); + return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata, size); } } diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java index d30a825264d01..6930b0579a897 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java @@ -30,6 +30,7 @@ public class TimeSeriesAggregator extends BucketsAggregator { protected final BytesKeyedBucketOrds bucketOrds; private final boolean keyed; + private final int size; public TimeSeriesAggregator( String name, @@ -38,11 +39,13 @@ public TimeSeriesAggregator( AggregationContext context, Aggregator parent, CardinalityUpperBound bucketCardinality, - Map metadata + Map metadata, + int size ) throws IOException { super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata); this.keyed = keyed; bucketOrds = BytesKeyedBucketOrds.build(bigArrays(), bucketCardinality); + this.size = size; } @Override @@ -66,6 +69,9 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I ); bucket.bucketOrd = ordsEnum.ord(); buckets.add(bucket); + if (buckets.size() >= size) { + break; + } } allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]); } diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilderTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilderTests.java index 2c87c9f298dca..eb738dc3fe828 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilderTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilderTests.java @@ -14,7 +14,8 @@ public class TimeSeriesAggregationBuilderTests extends AggregationBuilderTestCas @Override protected TimeSeriesAggregationBuilder createTestAggregatorBuilder() { - return new TimeSeriesAggregationBuilder(randomAlphaOfLength(10), randomBoolean()); + // Size set large enough tests not intending to hit the size limit shouldn't see it. + return new TimeSeriesAggregationBuilder(randomAlphaOfLength(10), randomBoolean(), randomIntBetween(1000, 100_000)); } } diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java index 2dc6676d9c4d1..20d2b45158b36 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java @@ -180,6 +180,45 @@ public void testMultiBucketAggregationAsSubAggregation() throws IOException { timeSeriesTestCase(tsBuilder, new MatchAllDocsQuery(), buildIndex, verifier); } + public void testAggregationSize() throws IOException { + CheckedConsumer buildIndex = multiTsWriter(); + + List> verifiers = new ArrayList>(); + + verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L))); + verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L))); + verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(2L))); + + for (int i = 1; i < 3; i++) { + int size = i; + Consumer limitedVerifier = ts -> { + assertThat(ts.getBuckets(), hasSize(size)); + + for (int j = 0; j < size; j++) { + verifiers.get(j).accept(ts); + } + }; + + TimeSeriesAggregationBuilder limitedTsBuilder = new TimeSeriesAggregationBuilder("by_tsid"); + limitedTsBuilder.setSize(i); + timeSeriesTestCase(limitedTsBuilder, new MatchAllDocsQuery(), buildIndex, limitedVerifier); + } + } + + private CheckedConsumer multiTsWriter() { + long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z"); + return iw -> { + writeTS(iw, startTime + 1, new Object[] { "dim1", "aaa", "dim2", "xxx" }, new Object[] { "val1", 1 }); + writeTS(iw, startTime + 2, new Object[] { "dim1", "aaa", "dim2", "yyy" }, new Object[] { "val1", 2 }); + writeTS(iw, startTime + 3, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 3 }); + writeTS(iw, startTime + 4, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 4 }); + writeTS(iw, startTime + 5, new Object[] { "dim1", "aaa", "dim2", "xxx" }, new Object[] { "val1", 5 }); + writeTS(iw, startTime + 6, new Object[] { "dim1", "aaa", "dim2", "yyy" }, new Object[] { "val1", 6 }); + writeTS(iw, startTime + 7, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 7 }); + writeTS(iw, startTime + 8, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 8 }); + }; + } + private void timeSeriesTestCase( TimeSeriesAggregationBuilder builder, Query query, diff --git a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml index 613106257c933..01829a5c12041 100644 --- a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml +++ b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/time_series.yml @@ -54,7 +54,7 @@ setup: --- "Basic test": - skip: - version: " - 8.5.99" + version: " - 8.6.99" reason: Time series result serialization changed in 8.6.0 - do: @@ -74,11 +74,55 @@ setup: - match: { hits.total.value: 1 } - - length: { aggregations.ts.buckets: 1 } + - length: { aggregations: 1 } - match: { aggregations.ts.buckets.0.key: { "key": "foo" } } - match: { aggregations.ts.buckets.0.doc_count: 1 } +--- +"Size test": + - skip: + version: " - 8.6.99" + reason: Size added in 8.7.0 + + - do: + search: + index: tsdb + body: + query: + range: + "@timestamp": + gte: "2019-01-01T00:10:00Z" + size: 0 + aggs: + ts: + time_series: + keyed: false + size: 1 + + - length: { aggregations.ts.buckets: 1 } + - match: { aggregations.ts.buckets.0.key: { "key": "bar" } } + + - do: + search: + index: tsdb + body: + query: + range: + "@timestamp": + gte: "2019-01-01T00:10:00Z" + size: 0 + aggs: + ts: + time_series: + keyed: false + size: 3 + + - length: { aggregations.ts.buckets: 3 } + - match: { aggregations.ts.buckets.0.key: { "key": "bar" } } + - match: { aggregations.ts.buckets.1.key: { "key": "baz" } } + - match: { aggregations.ts.buckets.2.key: { "key": "foo" } } + --- "Sampler aggregation with nested time series aggregation failure": - skip: