Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mitigate date histogram slowdowns with non-fixed timezones. #30534

Merged
merged 5 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.rounding.DateTimeUnit;
Expand All @@ -27,8 +31,13 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.AtomicNumericFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Relation;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.BucketOrder;
Expand All @@ -44,6 +53,8 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;
import org.joda.time.DateTimeField;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -351,36 +362,121 @@ public String getType() {
return NAME;
}

/*
* NOTE: this can't be done in rewrite() because the timezone is then also used on the
* coordinating node in order to generate missing buckets, which may cross a transition
* even though data on the shards doesn't.
*/
DateTimeZone rewriteTimeZone(QueryShardContext context) throws IOException {
final DateTimeZone tz = timeZone();
if (field() != null &&
tz != null &&
tz.isFixed() == false &&
field() != null &&
script() == null) {
final MappedFieldType ft = context.fieldMapper(field());
final IndexReader reader = context.getIndexReader();
if (ft != null && reader != null) {
Long anyInstant = null;
final IndexNumericFieldData fieldData = context.getForField(ft);
for (LeafReaderContext ctx : reader.leaves()) {
AtomicNumericFieldData leafFD = ((IndexNumericFieldData) fieldData).load(ctx);
SortedNumericDocValues values = leafFD.getLongValues();
if (values.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
anyInstant = values.nextValue();
break;
}
}

if (anyInstant != null) {
final long prevTransition = tz.previousTransition(anyInstant);
final long nextTransition = tz.nextTransition(anyInstant);

// We need all not only values but also rounded values to be within
// [prevTransition, nextTransition].
final long low;
DateTimeUnit intervalAsUnit = getIntervalAsDateTimeUnit();
if (intervalAsUnit != null) {
final DateTimeField dateTimeField = intervalAsUnit.field(tz);
low = dateTimeField.roundCeiling(prevTransition);
} else {
final TimeValue intervalAsMillis = getIntervalAsTimeValue();
low = Math.addExact(prevTransition, intervalAsMillis.millis());
}
// rounding rounds down, so 'nextTransition' is a good upper bound
final long high = nextTransition;

final DocValueFormat format = ft.docValueFormat(null, null);
final String formattedLow = format.format(low);
final String formattedHigh = format.format(high);
if (ft.isFieldWithinQuery(reader, formattedLow, formattedHigh,
true, false, tz, null, context) == Relation.WITHIN) {
// All values in this reader have the same offset despite daylight saving times.
// This is very common for location-based timezones such as Europe/Paris in
// combination with time-based indices.
return DateTimeZone.forOffsetMillis(tz.getOffset(anyInstant));
}
}
}
}
return tz;
}

@Override
protected ValuesSourceAggregatorFactory<Numeric, ?> innerBuild(SearchContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
Rounding rounding = createRounding();
final DateTimeZone tz = timeZone();
final Rounding rounding = createRounding(tz);
final DateTimeZone rewrittenTimeZone = rewriteTimeZone(context.getQueryShardContext());
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = createRounding(rewrittenTimeZone);
}

ExtendedBounds roundedBounds = null;
if (this.extendedBounds != null) {
// parse any string bounds to longs and round
roundedBounds = this.extendedBounds.parseAndValidate(name, context, config.format()).round(rounding);
}
return new DateHistogramAggregatorFactory(name, config, interval, dateHistogramInterval, offset, order, keyed, minDocCount,
rounding, roundedBounds, context, parent, subFactoriesBuilder, metaData);
return new DateHistogramAggregatorFactory(name, config, offset, order, keyed, minDocCount,
rounding, shardRounding, roundedBounds, context, parent, subFactoriesBuilder, metaData);
}

private Rounding createRounding() {
Rounding.Builder tzRoundingBuilder;
/** Return the interval as a date time unit if applicable. If this returns
* {@code null} then it means that the interval is expressed as a fixed
* {@link TimeValue} and may be accessed via
* {@link #getIntervalAsTimeValue()}. */
private DateTimeUnit getIntervalAsDateTimeUnit() {
if (dateHistogramInterval != null) {
DateTimeUnit dateTimeUnit = DATE_FIELD_UNITS.get(dateHistogramInterval.toString());
if (dateTimeUnit != null) {
tzRoundingBuilder = Rounding.builder(dateTimeUnit);
} else {
// the interval is a time value?
tzRoundingBuilder = Rounding.builder(
TimeValue.parseTimeValue(dateHistogramInterval.toString(), null, getClass().getSimpleName() + ".interval"));
}
return DATE_FIELD_UNITS.get(dateHistogramInterval.toString());
}
return null;
}

/**
* Get the interval as a {@link TimeValue}. Should only be called if
* {@link #getIntervalAsDateTimeUnit()} returned {@code null}.
*/
private TimeValue getIntervalAsTimeValue() {
if (dateHistogramInterval != null) {
return TimeValue.parseTimeValue(dateHistogramInterval.toString(), null, getClass().getSimpleName() + ".interval");
} else {
return TimeValue.timeValueMillis(interval);
}
}

private Rounding createRounding(DateTimeZone timeZone) {
Rounding.Builder tzRoundingBuilder;
DateTimeUnit intervalAsUnit = getIntervalAsDateTimeUnit();
if (intervalAsUnit != null) {
tzRoundingBuilder = Rounding.builder(intervalAsUnit);
} else {
// the interval is an integer time value in millis?
tzRoundingBuilder = Rounding.builder(TimeValue.timeValueMillis(interval));
tzRoundingBuilder = Rounding.builder(getIntervalAsTimeValue());
}
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
if (timeZone != null) {
tzRoundingBuilder.timeZone(timeZone);
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DateHistogramAggregator extends BucketsAggregator {
private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
private final Rounding rounding;
private final Rounding shardRounding;
private final BucketOrder order;
private final boolean keyed;

Expand All @@ -64,14 +65,15 @@ class DateHistogramAggregator extends BucketsAggregator {
private final LongHash bucketOrds;
private long offset;

DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, long offset, BucketOrder order,
boolean keyed,
DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding shardRounding,
long offset, BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Numeric valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.offset = offset;
this.order = InternalOrder.validate(order, this);;
this.keyed = keyed;
Expand Down Expand Up @@ -105,7 +107,9 @@ public void collect(int doc, long bucket) throws IOException {
long previousRounded = Long.MIN_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long value = values.nextValue();
long rounded = rounding.round(value - offset) + offset;
// We can use shardRounding here, which is sometimes more efficient
// if daylight saving times are involved.
long rounded = shardRounding.round(value - offset) + offset;
assert rounded >= previousRounded;
if (rounded == previousRounded) {
continue;
Expand Down Expand Up @@ -138,6 +142,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));

// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,27 @@
public final class DateHistogramAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric, DateHistogramAggregatorFactory> {

private final DateHistogramInterval dateHistogramInterval;
private final long interval;
private final long offset;
private final BucketOrder order;
private final boolean keyed;
private final long minDocCount;
private final ExtendedBounds extendedBounds;
private Rounding rounding;
private final Rounding rounding;
private final Rounding shardRounding;

public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, long interval,
DateHistogramInterval dateHistogramInterval, long offset, BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, ExtendedBounds extendedBounds, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config,
long offset, BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, Rounding shardRounding, ExtendedBounds extendedBounds, SearchContext context,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metaData);
this.interval = interval;
this.dateHistogramInterval = dateHistogramInterval;
this.offset = offset;
this.order = order;
this.keyed = keyed;
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
this.rounding = rounding;
this.shardRounding = shardRounding;
}

public long minDocCount() {
Expand All @@ -77,8 +76,8 @@ protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggrega

private Aggregator createAggregator(ValuesSource.Numeric valuesSource, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new DateHistogramAggregator(name, factories, rounding, offset, order, keyed, minDocCount, extendedBounds, valuesSource,
config.format(), context, parent, pipelineAggregators, metaData);
return new DateHistogramAggregator(name, factories, rounding, shardRounding, offset, order, keyed, minDocCount, extendedBounds,
valuesSource, config.format(), context, parent, pipelineAggregators, metaData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBoundsTests;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.joda.time.DateTimeZone;
import org.junit.Assume;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -120,4 +133,73 @@ private List<BucketOrder> randomOrder() {
return orders;
}

private static Document documentForDate(String field, long millis) {
Document doc = new Document();
doc.add(new LongPoint(field, millis));
doc.add(new SortedNumericDocValuesField(field, millis));
return doc;
}

public void testRewriteTimeZone() throws IOException {
Assume.assumeTrue(getCurrentTypes().length > 0); // we need mappings
FormatDateTimeFormatter format = Joda.forPattern("strict_date_optional_time");

try (Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {

w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2018-03-11T11:55:00").getMillis()));
w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2017-10-30T18:13:00").getMillis()));

try (IndexReader readerThatDoesntCross = DirectoryReader.open(w)) {

w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2018-03-25T02:44:00").getMillis()));

try (IndexReader readerThatCrosses = DirectoryReader.open(w)) {

QueryShardContext shardContextThatDoesntCross = createShardContext(readerThatDoesntCross);
QueryShardContext shardContextThatCrosses = createShardContext(readerThatCrosses);

DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("my_date_histo");
builder.field(DATE_FIELD_NAME);
builder.dateHistogramInterval(DateHistogramInterval.DAY);

// no timeZone => no rewrite
assertNull(builder.rewriteTimeZone(shardContextThatDoesntCross));
assertNull(builder.rewriteTimeZone(shardContextThatCrosses));

// fixed timeZone => no rewrite
DateTimeZone tz = DateTimeZone.forOffsetHours(1);
builder.timeZone(tz);
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

// daylight-saving-times => rewrite if doesn't cross
tz = DateTimeZone.forID("Europe/Paris");
builder.timeZone(tz);
assertEquals(DateTimeZone.forOffsetHours(1), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

// Rounded values are no longer all within the same transitions => no rewrite
builder.dateHistogramInterval(DateHistogramInterval.MONTH);
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

builder = new DateHistogramAggregationBuilder("my_date_histo");
builder.field(DATE_FIELD_NAME);
builder.timeZone(tz);

builder.interval(1000L * 60 * 60 * 24); // ~ 1 day
assertEquals(DateTimeZone.forOffsetHours(1), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

// Because the interval is large, rounded values are not
// within the same transitions as the values => no rewrite
builder.interval(1000L * 60 * 60 * 24 * 30); // ~ 1 month
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
}
}
}
}

}
Loading