Skip to content

Commit

Permalink
backport of elastic#32723
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Sanwald committed Aug 17, 2018
1 parent b3a60e3 commit 92e63cf
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public int getNumBuckets() {
return new AutoDateHistogramAggregatorFactory(name, config, numBuckets, roundings, context, parent, subFactoriesBuilder, metaData);
}

private static Rounding createRounding(DateTimeUnit interval, DateTimeZone timeZone) {
static Rounding createRounding(DateTimeUnit interval, DateTimeZone timeZone) {
Rounding.Builder tzRoundingBuilder = Rounding.builder(interval);
if (timeZone != null) {
tzRoundingBuilder.timeZone(timeZone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
return currentResult;
}
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx,
bucketInfo.roundingInfos);
bucketInfo.roundingInfos, targetBuckets);
RoundingInfo roundingInfo = bucketInfo.roundingInfos[roundingIdx];
Rounding rounding = roundingInfo.rounding;
// merge buckets using the new rounding
Expand Down Expand Up @@ -447,8 +447,8 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
return new BucketReduceResult(list, roundingInfo, roundingIdx);
}

private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
RoundingInfo[] roundings) {
static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
RoundingInfo[] roundings, int targetBuckets) {
if (roundingIdx == roundings.length - 1) {
return roundingIdx;
}
Expand Down Expand Up @@ -480,7 +480,7 @@ private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
currentKey = currentRounding.nextRoundingValue(currentKey);
}
currentRoundingIdx++;
} while (requiredBuckets > (targetBuckets * roundings[roundingIdx].getMaximumInnerInterval())
} while (requiredBuckets > (targetBuckets * roundings[currentRoundingIdx - 1].getMaximumInnerInterval())
&& currentRoundingIdx < roundings.length);
// The loop will increase past the correct rounding index here so we
// need to subtract one to get the rounding index we need
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
Expand All @@ -28,7 +29,11 @@
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -39,6 +44,8 @@
import static org.elasticsearch.common.unit.TimeValue.timeValueHours;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.createRounding;
import static org.hamcrest.Matchers.equalTo;

public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregationTestCase<InternalAutoDateHistogram> {

Expand All @@ -56,11 +63,12 @@ protected InternalAutoDateHistogram createTestInstance(String name,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData,
InternalAggregations aggregations) {

roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null);
int nbBuckets = randomNumberOfBuckets();
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);

long startingDate = System.currentTimeMillis();

long interval = randomIntBetween(1, 3);
Expand All @@ -72,23 +80,41 @@ protected InternalAutoDateHistogram createTestInstance(String name,
}
InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList());
BucketInfo bucketInfo = new BucketInfo(roundingInfos, randomIntBetween(0, roundingInfos.length - 1), subAggregations);
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
}

/*
This test was added to reproduce a bug where getAppropriateRounding was only ever using the first innerIntervals
passed in, instead of using the interval associated with the loop.
*/
public void testGetAppropriateRoundingUsesCorrectIntervals() {
RoundingInfo[] roundings = new RoundingInfo[6];
DateTimeZone timeZone = DateTimeZone.UTC;
// Since we pass 0 as the starting index to getAppropriateRounding, we'll also use
// an innerInterval that is quite large, such that targetBuckets * roundings[i].getMaximumInnerInterval()
// will be larger than the estimate.
roundings[0] = new RoundingInfo(createRounding(DateTimeUnit.SECOND_OF_MINUTE, timeZone),
1000L, 1000);
roundings[1] = new RoundingInfo(createRounding(DateTimeUnit.MINUTES_OF_HOUR, timeZone),
60 * 1000L, 1, 5, 10, 30);
roundings[2] = new RoundingInfo(createRounding(DateTimeUnit.HOUR_OF_DAY, timeZone),
60 * 60 * 1000L, 1, 3, 12);

return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
OffsetDateTime timestamp = Instant.parse("2018-01-01T00:00:01.000Z").atOffset(ZoneOffset.UTC);
// We want to pass a roundingIdx of zero, because in order to reproduce this bug, we need the function
// to increment the rounding (because the bug was that the function would not use the innerIntervals
// from the new rounding.
int result = InternalAutoDateHistogram.getAppropriateRounding(timestamp.toEpochSecond()*1000,
timestamp.plusDays(1).toEpochSecond()*1000, 0, roundings, 25);
assertThat(result, equalTo(2));
}

@Override
protected void assertReduced(InternalAutoDateHistogram reduced, List<InternalAutoDateHistogram> inputs) {
int roundingIdx = 0;
for (InternalAutoDateHistogram histogram : inputs) {
if (histogram.getBucketInfo().roundingIdx > roundingIdx) {
roundingIdx = histogram.getBucketInfo().roundingIdx;
}
}
RoundingInfo roundingInfo = roundingInfos[roundingIdx];

long lowest = Long.MAX_VALUE;
long highest = 0;

for (InternalAutoDateHistogram histogram : inputs) {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
long bucketKey = ((DateTime) bucket.getKey()).getMillis();
Expand All @@ -100,35 +126,72 @@ protected void assertReduced(InternalAutoDateHistogram reduced, List<InternalAut
}
}
}

int roundingIndex = reduced.getBucketInfo().roundingIdx;
RoundingInfo roundingInfo = roundingInfos[roundingIndex];

long normalizedDuration = (highest - lowest) / roundingInfo.getRoughEstimateDurationMillis();
long innerIntervalToUse = 0;
for (int interval : roundingInfo.innerIntervals) {
if (normalizedDuration / interval < maxNumberOfBuckets()) {
innerIntervalToUse = interval;
long innerIntervalToUse = roundingInfo.innerIntervals[0];
int innerIntervalIndex = 0;

// First, try to calculate the correct innerInterval using the normalizedDuration.
// This handles cases where highest and lowest are further apart than the interval being used.
if (normalizedDuration != 0) {
for (int j = roundingInfo.innerIntervals.length-1; j >= 0; j--) {
int interval = roundingInfo.innerIntervals[j];
if (normalizedDuration / interval < reduced.getBuckets().size()) {
innerIntervalToUse = interval;
innerIntervalIndex = j;
}
}
}

long intervalInMillis = innerIntervalToUse * roundingInfo.getRoughEstimateDurationMillis();
int bucketCount = getBucketCount(lowest, highest, roundingInfo, intervalInMillis);

//Next, if our bucketCount is still above what we need, we'll go back and determine the interval
// based on a size calculation.
if (bucketCount > reduced.getBuckets().size()) {
for (int i = innerIntervalIndex; i < roundingInfo.innerIntervals.length; i++) {
long newIntervalMillis = roundingInfo.innerIntervals[i] * roundingInfo.getRoughEstimateDurationMillis();
if (getBucketCount(lowest, highest, roundingInfo, newIntervalMillis) <= reduced.getBuckets().size()) {
innerIntervalToUse = roundingInfo.innerIntervals[i];
intervalInMillis = innerIntervalToUse * roundingInfo.getRoughEstimateDurationMillis();
}
}
}

Map<Long, Long> expectedCounts = new TreeMap<>();
long intervalInMillis = innerIntervalToUse*roundingInfo.getRoughEstimateDurationMillis();
for (long keyForBucket = roundingInfo.rounding.round(lowest);
keyForBucket <= highest;
keyForBucket <= roundingInfo.rounding.round(highest);
keyForBucket = keyForBucket + intervalInMillis) {
expectedCounts.put(keyForBucket, 0L);

// Iterate through the input buckets, and for each bucket, determine if it's inside
// the range of the bucket in the outer loop. if it is, add the doc count to the total
// for that bucket.

for (InternalAutoDateHistogram histogram : inputs) {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
long bucketKey = ((DateTime) bucket.getKey()).getMillis();
long roundedBucketKey = roundingInfo.rounding.round(bucketKey);
long roundedBucketKey = roundingInfo.rounding.round(((DateTime) bucket.getKey()).getMillis());
long docCount = bucket.getDocCount();
if (roundedBucketKey >= keyForBucket
&& roundedBucketKey < keyForBucket + intervalInMillis) {
long count = bucket.getDocCount();
expectedCounts.compute(keyForBucket,
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + count);
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + docCount);
}
}
}
}

// If there is only a single bucket, and we haven't added it above, add a bucket with no documents.
// this step is necessary because of the roundedBucketKey < keyForBucket + intervalInMillis above.
if (roundingInfo.rounding.round(lowest) == roundingInfo.rounding.round(highest) && expectedCounts.isEmpty()) {
expectedCounts.put(roundingInfo.rounding.round(lowest), 0L);
}


// pick out the actual reduced values to the make the assertion more readable
Map<Long, Long> actualCounts = new TreeMap<>();
for (Histogram.Bucket bucket : reduced.getBuckets()) {
actualCounts.compute(((DateTime) bucket.getKey()).getMillis(),
Expand All @@ -137,6 +200,16 @@ protected void assertReduced(InternalAutoDateHistogram reduced, List<InternalAut
assertEquals(expectedCounts, actualCounts);
}

private int getBucketCount(long lowest, long highest, RoundingInfo roundingInfo, long intervalInMillis) {
int bucketCount = 0;
for (long keyForBucket = roundingInfo.rounding.round(lowest);
keyForBucket <= roundingInfo.rounding.round(highest);
keyForBucket = keyForBucket + intervalInMillis) {
bucketCount++;
}
return bucketCount;
}

@Override
protected Writeable.Reader<InternalAutoDateHistogram> instanceReader() {
return InternalAutoDateHistogram::new;
Expand Down

0 comments on commit 92e63cf

Please sign in to comment.