Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add size parameter to time_series aggregation #93496

Merged
merged 6 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/93496.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93496
summary: Size for time series
area: Geo
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ protected boolean lessThan(IteratorAndCurrent<InternalBucket> a, IteratorAndCurr
}

InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata());
Integer size = reduceContext.builder() instanceof TimeSeriesAggregationBuilder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 then we don't have to serialise size as part of InternalTimeSeries class.
Is reduceContext.builder() instanceof TimeSeriesAggregationBuilder also ways true?
If so we should throw an IllagalStateException if the builder isn't instance of TimeSeriesAggregationBuilder.
If this isn't the case then we should default to TimeSeriesAggregationBuilder#DEFAULT_SIZE.
I think size variable can just be an int?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests may use a fake builder

I was unaware of this. So I think using Integer as type here is ok.

? ((TimeSeriesAggregationBuilder) reduceContext.builder()).getSize()
: null; // tests may use a fake builder
List<InternalBucket> bucketsWithSameKey = new ArrayList<>(aggregations.size());
BytesRef prevTsid = null;
while (pq.size() > 0) {
Expand Down Expand Up @@ -247,6 +250,9 @@ protected boolean lessThan(IteratorAndCurrent<InternalBucket> a, IteratorAndCurr
BytesRef tsid = reducedBucket.key;
assert prevTsid == null || tsid.compareTo(prevTsid) > 0;
reduced.buckets.add(reducedBucket);
if (size != null && reduced.buckets.size() >= size) {
break;
}
prevTsid = tsid;
}
return reduced;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.xcontent.InstantiatingObjectParser;
import org.elasticsearch.xcontent.ParseField;
Expand All @@ -30,9 +31,13 @@
public class TimeSeriesAggregationBuilder extends AbstractAggregationBuilder<TimeSeriesAggregationBuilder> {
public static final String NAME = "time_series";
public static final ParseField KEYED_FIELD = new ParseField("keyed");
public static final ParseField SIZE_FIELD = new ParseField("size");
public static final InstantiatingObjectParser<TimeSeriesAggregationBuilder, String> PARSER;

private boolean keyed;
private int size;

private static final int DEFAULT_SIZE = MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;

static {
InstantiatingObjectParser.Builder<TimeSeriesAggregationBuilder, String> parser = InstantiatingObjectParser.builder(
Expand All @@ -41,17 +46,23 @@ public class TimeSeriesAggregationBuilder extends AbstractAggregationBuilder<Tim
TimeSeriesAggregationBuilder.class
);
parser.declareBoolean(optionalConstructorArg(), KEYED_FIELD);
parser.declareInt(optionalConstructorArg(), SIZE_FIELD);
PARSER = parser.build();
}

public TimeSeriesAggregationBuilder(String name) {
this(name, true);
this(name, true, DEFAULT_SIZE);
}

@ParserConstructor
public TimeSeriesAggregationBuilder(String name, Boolean keyed) {
this(name, keyed, DEFAULT_SIZE);
}

@ParserConstructor
public TimeSeriesAggregationBuilder(String name, Boolean keyed, Integer size) {
super(name);
this.keyed = keyed != null ? keyed : true;
this.size = size != null ? size : DEFAULT_SIZE;
}

protected TimeSeriesAggregationBuilder(
Expand All @@ -61,16 +72,19 @@ protected TimeSeriesAggregationBuilder(
) {
super(clone, factoriesBuilder, metadata);
this.keyed = clone.keyed;
this.size = clone.size;
}

public TimeSeriesAggregationBuilder(StreamInput in) throws IOException {
super(in);
keyed = in.readBoolean();
size = in.readVInt();
}

@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeBoolean(keyed);
out.writeVInt(size);
}

@Override
Expand All @@ -79,13 +93,14 @@ protected AggregatorFactory doBuild(
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder
) throws IOException {
return new TimeSeriesAggregationFactory(name, keyed, context, parent, subFactoriesBuilder, metadata);
return new TimeSeriesAggregationFactory(name, keyed, size, context, parent, subFactoriesBuilder, metadata);
}

@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(KEYED_FIELD.getPreferredName(), keyed);
builder.field(SIZE_FIELD.getPreferredName(), size);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -118,6 +133,18 @@ public void setKeyed(boolean keyed) {
this.keyed = keyed;
}

public TimeSeriesAggregationBuilder setSize(int size) {
if (size <= 0) {
throw new IllegalArgumentException("[size] must be greater than 0. Found [" + size + "] in [" + name + "]");
}
this.size = size;
return this;
}

public int getSize() {
return size;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@ public class TimeSeriesAggregationFactory extends AggregatorFactory {

private final boolean keyed;

private final int size;

public TimeSeriesAggregationFactory(
String name,
boolean keyed,
int size,
AggregationContext context,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, subFactoriesBuilder, metadata);
this.keyed = keyed;
this.size = size;
}

@Override
protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
throws IOException {
return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata);
return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class TimeSeriesAggregator extends BucketsAggregator {

protected final BytesKeyedBucketOrds bucketOrds;
private final boolean keyed;
private final int size;

public TimeSeriesAggregator(
String name,
Expand All @@ -38,11 +39,13 @@ public TimeSeriesAggregator(
AggregationContext context,
Aggregator parent,
CardinalityUpperBound bucketCardinality,
Map<String, Object> metadata
Map<String, Object> metadata,
int size
) throws IOException {
super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
this.keyed = keyed;
bucketOrds = BytesKeyedBucketOrds.build(bigArrays(), bucketCardinality);
this.size = size;
}

@Override
Expand All @@ -66,6 +69,9 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
);
bucket.bucketOrd = ordsEnum.ord();
buckets.add(bucket);
if (buckets.size() >= size) {
break;
}
}
allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public class TimeSeriesAggregationBuilderTests extends AggregationBuilderTestCas

@Override
protected TimeSeriesAggregationBuilder createTestAggregatorBuilder() {
return new TimeSeriesAggregationBuilder(randomAlphaOfLength(10), randomBoolean());
// Size set large enough tests not intending to hit the size limit shouldn't see it.
return new TimeSeriesAggregationBuilder(randomAlphaOfLength(10), randomBoolean(), randomIntBetween(1000, 100_000));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,45 @@ public void testMultiBucketAggregationAsSubAggregation() throws IOException {
timeSeriesTestCase(tsBuilder, new MatchAllDocsQuery(), buildIndex, verifier);
}

public void testAggregationSize() throws IOException {
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = multiTsWriter();

List<Consumer<InternalTimeSeries>> verifiers = new ArrayList<Consumer<InternalTimeSeries>>();

verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L)));
verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L)));
verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(2L)));

for (int i = 1; i < 3; i++) {
int size = i;
Consumer<InternalTimeSeries> limitedVerifier = ts -> {
assertThat(ts.getBuckets(), hasSize(size));

for (int j = 0; j < size; j++) {
verifiers.get(j).accept(ts);
}
};

TimeSeriesAggregationBuilder limitedTsBuilder = new TimeSeriesAggregationBuilder("by_tsid");
limitedTsBuilder.setSize(i);
timeSeriesTestCase(limitedTsBuilder, new MatchAllDocsQuery(), buildIndex, limitedVerifier);
}
}

private CheckedConsumer<RandomIndexWriter, IOException> multiTsWriter() {
long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z");
return iw -> {
writeTS(iw, startTime + 1, new Object[] { "dim1", "aaa", "dim2", "xxx" }, new Object[] { "val1", 1 });
writeTS(iw, startTime + 2, new Object[] { "dim1", "aaa", "dim2", "yyy" }, new Object[] { "val1", 2 });
writeTS(iw, startTime + 3, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 3 });
writeTS(iw, startTime + 4, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 4 });
writeTS(iw, startTime + 5, new Object[] { "dim1", "aaa", "dim2", "xxx" }, new Object[] { "val1", 5 });
writeTS(iw, startTime + 6, new Object[] { "dim1", "aaa", "dim2", "yyy" }, new Object[] { "val1", 6 });
writeTS(iw, startTime + 7, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 7 });
writeTS(iw, startTime + 8, new Object[] { "dim1", "bbb", "dim2", "zzz" }, new Object[] { "val1", 8 });
};
}

private void timeSeriesTestCase(
TimeSeriesAggregationBuilder builder,
Query query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ setup:
---
"Basic test":
- skip:
version: " - 8.5.99"
version: " - 8.6.99"
reason: Time series result serialization changed in 8.6.0

- do:
Expand All @@ -74,11 +74,55 @@ setup:


- match: { hits.total.value: 1 }
- length: { aggregations.ts.buckets: 1 }
- length: { aggregations: 1 }

- match: { aggregations.ts.buckets.0.key: { "key": "foo" } }
- match: { aggregations.ts.buckets.0.doc_count: 1 }

---
"Size test":
- skip:
version: " - 8.6.99"
reason: Size added in 8.7.0

- do:
search:
index: tsdb
body:
query:
range:
"@timestamp":
gte: "2019-01-01T00:10:00Z"
size: 0
aggs:
ts:
time_series:
keyed: false
size: 1

- length: { aggregations.ts.buckets: 1 }
martijnvg marked this conversation as resolved.
Show resolved Hide resolved
- match: { aggregations.ts.buckets.0.key: { "key": "bar" } }

- do:
search:
index: tsdb
body:
query:
range:
"@timestamp":
gte: "2019-01-01T00:10:00Z"
size: 0
aggs:
ts:
time_series:
keyed: false
size: 3

- length: { aggregations.ts.buckets: 3 }
- match: { aggregations.ts.buckets.0.key: { "key": "bar" } }
- match: { aggregations.ts.buckets.1.key: { "key": "baz" } }
- match: { aggregations.ts.buckets.2.key: { "key": "foo" } }

---
"Sampler aggregation with nested time series aggregation failure":
- skip:
Expand Down