Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AutoIntervalDateHistogram.testReduce random failures #32301

Merged
merged 8 commits into from
Jul 31, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -53,7 +54,7 @@ public class AutoDateHistogramAggregationBuilder

public static final String NAME = "auto_date_histogram";

public static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets");
private static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets");

private static final ObjectParser<AutoDateHistogramAggregationBuilder, Void> PARSER;
static {
Expand All @@ -63,6 +64,29 @@ public class AutoDateHistogramAggregationBuilder
PARSER.declareInt(AutoDateHistogramAggregationBuilder::setNumBuckets, NUM_BUCKETS_FIELD);
}

/**
*
* Build roundings, computed dynamically as roundings are time zone dependent.
* The current implementation probably should not be invoked in a tight loop.
* @return Array of RoundingInfo
*/
static RoundingInfo[] buildRoundings(DateTimeZone timeZone) {
RoundingInfo[] roundings = new RoundingInfo[6];
roundings[0] = new RoundingInfo(createRounding(DateTimeUnit.SECOND_OF_MINUTE, timeZone),
1000L, 1, 5, 10, 30);
roundings[1] = new RoundingInfo(createRounding(DateTimeUnit.MINUTES_OF_HOUR, timeZone),
60 * 1000L, 1, 5, 10, 30);
roundings[2] = new RoundingInfo(createRounding(DateTimeUnit.HOUR_OF_DAY, timeZone),
60 * 60 * 1000L, 1, 3, 12);
roundings[3] = new RoundingInfo(createRounding(DateTimeUnit.DAY_OF_MONTH, timeZone),
24 * 60 * 60 * 1000L, 1, 7);
roundings[4] = new RoundingInfo(createRounding(DateTimeUnit.MONTH_OF_YEAR, timeZone),
30 * 24 * 60 * 60 * 1000L, 1, 3);
roundings[5] = new RoundingInfo(createRounding(DateTimeUnit.YEAR_OF_CENTURY, timeZone),
365 * 24 * 60 * 60 * 1000L, 1, 5, 10, 20, 50, 100);
return roundings;
}

public static AutoDateHistogramAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return PARSER.parse(parser, new AutoDateHistogramAggregationBuilder(aggregationName), null);
}
Expand Down Expand Up @@ -116,14 +140,7 @@ public int getNumBuckets() {
@Override
protected ValuesSourceAggregatorFactory<Numeric, ?> innerBuild(SearchContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
RoundingInfo[] roundings = new RoundingInfo[6];
roundings[0] = new RoundingInfo(createRounding(DateTimeUnit.SECOND_OF_MINUTE), 1000L, 1, 5, 10, 30);
roundings[1] = new RoundingInfo(createRounding(DateTimeUnit.MINUTES_OF_HOUR), 60 * 1000L, 1, 5, 10, 30);
roundings[2] = new RoundingInfo(createRounding(DateTimeUnit.HOUR_OF_DAY), 60 * 60 * 1000L, 1, 3, 12);
roundings[3] = new RoundingInfo(createRounding(DateTimeUnit.DAY_OF_MONTH), 24 * 60 * 60 * 1000L, 1, 7);
roundings[4] = new RoundingInfo(createRounding(DateTimeUnit.MONTH_OF_YEAR), 30 * 24 * 60 * 60 * 1000L, 1, 3);
roundings[5] = new RoundingInfo(createRounding(DateTimeUnit.YEAR_OF_CENTURY), 365 * 24 * 60 * 60 * 1000L, 1, 5, 10, 20, 50, 100);

RoundingInfo[] roundings = buildRoundings(timeZone());
int maxRoundingInterval = Arrays.stream(roundings,0, roundings.length-1)
.map(rounding -> rounding.innerIntervals)
.flatMapToInt(Arrays::stream)
Expand All @@ -139,10 +156,10 @@ public int getNumBuckets() {
return new AutoDateHistogramAggregatorFactory(name, config, numBuckets, roundings, context, parent, subFactoriesBuilder, metaData);
}

private Rounding createRounding(DateTimeUnit interval) {
private static Rounding createRounding(DateTimeUnit interval, DateTimeZone timeZone) {
Rounding.Builder tzRoundingBuilder = Rounding.builder(interval);
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
if (timeZone != null) {
tzRoundingBuilder.timeZone(timeZone);
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
return new BucketReduceResult(list, roundingInfo, roundingIdx);
}

private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, RoundingInfo[] roundings) {
private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
RoundingInfo[] roundings) {
if (roundingIdx == roundings.length - 1) {
return roundingIdx;
}
Expand Down Expand Up @@ -509,7 +510,8 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
pipelineAggregators(), getMetaData());
}

private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult, ReduceContext reduceContext) {
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult,
ReduceContext reduceContext) {
List<Bucket> buckets = reducedBucketsResult.buckets;
RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo;
int roundingIdx = reducedBucketsResult.roundingIdx;
Expand Down Expand Up @@ -539,7 +541,7 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
key = roundingInfo.rounding.round(bucket.key);
}
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
Expand Down Expand Up @@ -51,21 +49,16 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
public void setUp() throws Exception {
super.setUp();
format = randomNumericDocValueFormat();

roundingInfos = new RoundingInfo[6];
roundingInfos[0] = new RoundingInfo(Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), 1, 5, 10, 30);
roundingInfos[1] = new RoundingInfo(Rounding.builder(DateTimeUnit.MINUTES_OF_HOUR).build(), 1, 5, 10, 30);
roundingInfos[2] = new RoundingInfo(Rounding.builder(DateTimeUnit.HOUR_OF_DAY).build(), 1, 3, 12);
roundingInfos[3] = new RoundingInfo(Rounding.builder(DateTimeUnit.DAY_OF_MONTH).build(), 1, 7);
roundingInfos[4] = new RoundingInfo(Rounding.builder(DateTimeUnit.MONTH_OF_YEAR).build(), 1, 3);
roundingInfos[5] = new RoundingInfo(Rounding.builder(DateTimeUnit.YEAR_OF_CENTURY).build(), 1, 10, 20, 50, 100);
}

@Override
protected InternalAutoDateHistogram createTestInstance(String name,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData,
InternalAggregations aggregations) {
AutoDateHistogramAggregationBuilder aggregationBuilder = new AutoDateHistogramAggregationBuilder("_name");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to create a builder here anymore. These tests don't currently set the time zone in the builder anyway so aggregationBuilder.timeZone() will always be null which implicitly means the time zone will be treated as UTC. As mentioned before we should add a test that does set the time zone to test its treated correctly but that doesn't need to be part of this PR so I think we can remove this line and just pass in null below?

roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(aggregationBuilder.timeZone());

int nbBuckets = randomNumberOfBuckets();
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);
Expand All @@ -81,6 +74,7 @@ protected InternalAutoDateHistogram createTestInstance(String name,
InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList());
BucketInfo bucketInfo = new BucketInfo(roundingInfos, randomIntBetween(0, roundingInfos.length - 1), subAggregations);


return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
}

Expand All @@ -92,13 +86,50 @@ protected void assertReduced(InternalAutoDateHistogram reduced, List<InternalAut
roundingIdx = histogram.getBucketInfo().roundingIdx;
}
}
Map<Long, Long> expectedCounts = new TreeMap<>();
for (Histogram histogram : inputs) {
RoundingInfo roundingInfo = roundingInfos[roundingIdx];

long lowest = Long.MAX_VALUE;
long highest = 0;
for (InternalAutoDateHistogram histogram : inputs) {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
expectedCounts.compute(roundingInfos[roundingIdx].rounding.round(((DateTime) bucket.getKey()).getMillis()),
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + bucket.getDocCount());
long bucketKey = ((DateTime) bucket.getKey()).getMillis();
if (bucketKey < lowest) {
lowest = bucketKey;
}
if (bucketKey > highest) {
highest = bucketKey;
}
}
}
long normalizedDuration = (highest - lowest) / roundingInfo.getRoughEstimateDurationMillis();
long innerIntervalToUse = 0;
for (int interval : roundingInfo.innerIntervals) {
if (normalizedDuration / interval < maxNumberOfBuckets()) {
innerIntervalToUse = interval;
}
}
Map<Long, Long> expectedCounts = new TreeMap<>();
long intervalInMillis = innerIntervalToUse*roundingInfo.getRoughEstimateDurationMillis();
for (long keyForBucket = roundingInfo.rounding.round(lowest);
keyForBucket <= highest;
keyForBucket = keyForBucket + intervalInMillis) {
expectedCounts.put(keyForBucket, 0L);

for (InternalAutoDateHistogram histogram : inputs) {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
long bucketKey = ((DateTime) bucket.getKey()).getMillis();
long roundedBucketKey = roundingInfo.rounding.round(bucketKey);
if (roundedBucketKey >= keyForBucket
&& roundedBucketKey < keyForBucket + intervalInMillis) {
long count = bucket.getDocCount();
expectedCounts.compute(keyForBucket,
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + count);
}
}
}
}


Map<Long, Long> actualCounts = new TreeMap<>();
for (Histogram.Bucket bucket : reduced.getBuckets()) {
actualCounts.compute(((DateTime) bucket.getKey()).getMillis(),
Expand All @@ -117,12 +148,6 @@ protected Class<? extends ParsedMultiBucketAggregation> implementationClass() {
return ParsedAutoDateHistogram.class;
}

@Override
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32215")
public void testReduceRandom() {
super.testReduceRandom();
}

@Override
protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram instance) {
String name = instance.getName();
Expand Down