Skip to content

Commit

Permalink
Speed up date_histogram without children (#63643)
Browse files Browse the repository at this point in the history
This speeds up `date_histogram` aggregations without a parent or
children. This is quite common - it's the aggregation that Kibana's Discover
uses all over the place. Also, we hope to be able to use the same
mechanism to speed aggs with children one day, but that day isn't today.

The kind of speedup we're seeing is fairly substantial in many cases:
```
|                              |                                            |  before |   after |    |
| 90th percentile service time |           date_histogram_calendar_interval | 9266.07 | 1376.13 | ms |
| 90th percentile service time |   date_histogram_calendar_interval_with_tz | 9217.21 | 1372.67 | ms |
| 90th percentile service time |              date_histogram_fixed_interval | 8817.36 | 1312.67 | ms |
| 90th percentile service time |      date_histogram_fixed_interval_with_tz | 8801.71 | 1311.69 | ms | <-- discover's agg
| 90th percentile service time | date_histogram_fixed_interval_with_metrics | 44660.2 | 43789.5 | ms |
```

This uses the work we did in #61467 to precompute the rounding points for
a `date_histogram`. Now, when we know the rounding points we execute the
`date_histogram` as a `range` aggregation. This is nice for two reasons:
1. We can further rewrite the `range` aggregation (see below)
2. We don't need to allocate a hash to convert rounding points
   to ordinals.
3. We can send precise cardinality estimates to sub-aggs.

Points 2 and 3 above are nice, but most of the speed difference comes from
point 1. Specifically, we now look into executing `range` aggregations as
a `filters` aggregation. Normally the `filters` aggregation is quite slow
but when it doesn't have a parent or any children then we can execute it
"filter by filter" which is significantly faster. So fast, in fact, that
it is faster than the original `date_histogram`.

The `range` aggregation is *fairly* careful in how it rewrites, giving up
on the `filters` aggregation if it won't collect "filter by filter" and
falling back to its original execution mechanism.


So an aggregation like this:

```
POST _search
{
  "size": 0,
  "query": {
    "range": {
      "dropoff_datetime": {
        "gte": "2015-01-01 00:00:00",
        "lt": "2016-01-01 00:00:00"
      }
    }
  },
  "aggs": {
    "dropoffs_over_time": {
      "date_histogram": {
        "field": "dropoff_datetime",
        "fixed_interval": "60d",
        "time_zone": "America/New_York"
      }
    }
  }
}
```

is executed like:

```
POST _search
{
  "size": 0,
  "query": {
    "range": {
      "dropoff_datetime": {
        "gte": "2015-01-01 00:00:00",
        "lt": "2016-01-01 00:00:00"
      }
    }
  },
  "aggs": {
    "dropoffs_over_time": {
      "range": {
        "field": "dropoff_datetime",
        "ranges": [
          {"from": 1415250000000, "to": 1420434000000},
          {"from": 1420434000000, "to": 1425618000000},
          {"from": 1425618000000, "to": 1430798400000},
          {"from": 1430798400000, "to": 1435982400000},
          {"from": 1435982400000, "to": 1441166400000},
          {"from": 1441166400000, "to": 1446350400000},
          {"from": 1446350400000, "to": 1451538000000},
          {"from": 1451538000000}
        ]
      }
    }
  }
}
```

Which in turn is executed like this:

```
POST _search
{
  "size": 0,
  "query": {
    "range": {
      "dropoff_datetime": {
        "gte": "2015-01-01 00:00:00",
        "lt": "2016-01-01 00:00:00"
      }
    }
  },
  "aggs": {
    "dropoffs_over_time": {
      "filters": {
        "filters": {
          "1": {"range": {"dropoff_datetime": {"gte": "2014-12-30 00:00:00", "lt": "2015-01-05 05:00:00"}}},
          "2": {"range": {"dropoff_datetime": {"gte": "2015-01-05 05:00:00", "lt": "2015-03-06 05:00:00"}}},
          "3": {"range": {"dropoff_datetime": {"gte": "2015-03-06 00:00:00", "lt": "2015-05-05 00:00:00"}}},
          "4": {"range": {"dropoff_datetime": {"gte": "2015-05-05 00:00:00", "lt": "2015-07-04 00:00:00"}}},
          "5": {"range": {"dropoff_datetime": {"gte": "2015-07-04 00:00:00", "lt": "2015-09-02 00:00:00"}}},
          "6": {"range": {"dropoff_datetime": {"gte": "2015-09-02 00:00:00", "lt": "2015-11-01 00:00:00"}}},
          "7": {"range": {"dropoff_datetime": {"gte": "2015-11-01 00:00:00", "lt": "2015-12-31 00:00:00"}}},
          "8": {"range": {"dropoff_datetime": {"gte": "2015-12-31 00:00:00"}}}
        }
      }
    }
  }
}
```

And *that* is faster because we can execute it "filter by filter".

Finally, notice the `range` query filtering the data. That is required for
the data set that I'm using for testing. The "filter by filter" collection
mechanism for the `filters` agg needs special case handling when the query
is a `range` query and the filter is a `range` query and they are both on
the same field. That special case handling "merges" the range query.
Without it "filter by filter" collection is substantially slower. Its still
quite a bit quicker than the standard `filter` collection, but not nearly
as fast as it could be.
  • Loading branch information
nik9000 authored Nov 9, 2020
1 parent b31a8ff commit 7ceed13
Show file tree
Hide file tree
Showing 36 changed files with 2,271 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,58 @@ setup:
date:
type: date

- do:
bulk:
index: test_2
refresh: true
body:
- '{"index": {}}'
- '{"date": "2000-01-01"}' # This date is intenationally very far in the past so we end up not being able to use the date_histo -> range -> filters optimization
- '{"index": {}}'
- '{"date": "2000-01-02"}'
- '{"index": {}}'
- '{"date": "2016-02-01"}'
- '{"index": {}}'
- '{"date": "2016-03-01"}'

- do:
search:
index: test_2
body:
size: 0
profile: true
aggs:
histo:
date_histogram:
field: date
calendar_interval: month
- match: { hits.total.value: 4 }
- length: { aggregations.histo.buckets: 195 }
- match: { aggregations.histo.buckets.0.key_as_string: "2000-01-01T00:00:00.000Z" }
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }

---
"date_histogram run as filters profiler":
- skip:
version: " - 7.99.99"
reason: optimization added in 7.11.0, backport pending

- do:
indices.create:
index: test_2
body:
settings:
number_of_replicas: 0
number_of_shards: 1
mappings:
properties:
date:
type: date

- do:
bulk:
index: test_2
Expand Down Expand Up @@ -524,10 +576,13 @@ setup:
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2016-01-01T00:00:00.000Z" }
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator.FromDateRange }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }
# ultimately this ends up as a filters agg that uses filter by filter collection which is tracked in build_leaf_collector
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 0 }
- match: { profile.shards.0.aggregations.0.debug.delegate: RangeAggregator.FromFilters }
- match: { profile.shards.0.aggregations.0.debug.delegate_debug.delegate: FiltersAggregator.FilterByFilter }
- match: { profile.shards.0.aggregations.0.debug.delegate_debug.delegate_debug.segments_with_deleted_docs: 0 }

---
"histogram with hard bounds":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,5 +590,10 @@ public ScoreMode scoreMode() {

@Override
public void preCollection() throws IOException {}

@Override
public Aggregator[] subAggregators() {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down
23 changes: 23 additions & 0 deletions server/src/main/java/org/elasticsearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ public interface Prepared {
* next rounded value in specified units if possible.
*/
double roundingSize(long utcMillis, DateTimeUnit timeUnit);
/**
* If this rounding mechanism precalculates rounding points then
* this array stores dates such that each date between each entry.
* if the rounding mechanism doesn't precalculate points then this
* is {@code null}.
*/
long[] fixedRoundingPoints();
}
/**
* Prepare to round many times.
Expand Down Expand Up @@ -435,6 +442,11 @@ protected Prepared maybeUseArray(long minUtcMillis, long maxUtcMillis, int max)
}
return new ArrayRounding(values, i, this);
}

@Override
public long[] fixedRoundingPoints() {
return null;
}
}

static class TimeUnitRounding extends Rounding {
Expand Down Expand Up @@ -1253,6 +1265,12 @@ public long nextRoundingValue(long utcMillis) {
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
return delegatePrepared.roundingSize(utcMillis, timeUnit);
}

@Override
public long[] fixedRoundingPoints() {
// TODO we can likely translate here
return null;
}
};
}

Expand Down Expand Up @@ -1335,5 +1353,10 @@ public long nextRoundingValue(long utcMillis) {
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
return delegate.roundingSize(utcMillis, timeUnit);
}

@Override
public long[] fixedRoundingPoints() {
return Arrays.copyOf(values, max);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.search.profile.aggregation.InternalAggregationProfileTree;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* An {@linkplain Aggregator} that delegates collection to another
* {@linkplain Aggregator} and then translates its results into the results
* you'd expect from another aggregation.
*/
public abstract class AdaptingAggregator extends Aggregator {
private final Aggregator parent;
private final Aggregator delegate;

public AdaptingAggregator(
Aggregator parent,
AggregatorFactories subAggregators,
CheckedFunction<AggregatorFactories, Aggregator, IOException> delegate
) throws IOException {
// Its important we set parent first or else when we build the sub-aggregators they can fail because they'll call this.parent.
this.parent = parent;
/*
* Lock the parent of the sub-aggregators to *this* instead of to
* the delegate. This keeps the parent link shaped like the requested
* agg tree. Thisis how it has always been and some aggs rely on it.
*/
this.delegate = delegate.apply(subAggregators.fixParent(this));
assert this.delegate.parent() == parent : "invalid parent set on delegate";
}

/**
* Adapt the result from the collecting {@linkplain Aggregator} into the
* result expected by this {@linkplain Aggregator}.
*/
protected abstract InternalAggregation adapt(InternalAggregation delegateResult);

@Override
public final void close() {
delegate.close();
}

@Override
public final ScoreMode scoreMode() {
return delegate.scoreMode();
}

@Override
public final String name() {
return delegate.name();
}

@Override
public final Aggregator parent() {
return parent;
}

@Override
public final Aggregator subAggregator(String name) {
return delegate.subAggregator(name);
}

@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return delegate.getLeafCollector(ctx);
}

@Override
public final void preCollection() throws IOException {
delegate.preCollection();
}

@Override
public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] delegateResults = delegate.buildAggregations(owningBucketOrds);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = adapt(delegateResults[ordIdx]);
}
return result;
}

@Override
public final InternalAggregation buildEmptyAggregation() {
return adapt(delegate.buildEmptyAggregation());
}

@Override
public final Aggregator[] subAggregators() {
return delegate.subAggregators();
}

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("delegate", InternalAggregationProfileTree.typeFromAggregator(delegate));
Map<String, Object> delegateDebug = new HashMap<>();
delegate.collectDebugInfo(delegateDebug::put);
add.accept("delegate_debug", delegateDebug);
}

public Aggregator delegate() {
return delegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public final InternalAggregation buildTopLevel() throws IOException {
*/
public void collectDebugInfo(BiConsumer<String, Object> add) {}

/**
* Get the aggregators running under this one.
*/
public abstract Aggregator[] subAggregators();

/** Aggregation mode for sub aggregations. */
public enum SubAggCollectionMode implements Writeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ public Aggregator parent() {
return parent;
}

@Override
public Aggregator[] subAggregators() {
return subAggregators;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,27 @@ public int countAggregators() {
return factories.length;
}

/**
* This returns a copy of {@link AggregatorFactories} modified so that
* calls to {@link #createSubAggregators} will ignore the provided parent
* aggregator and always use {@code fixedParent} provided in to this
* method.
* <p>
* {@link AdaptingAggregator} uses this to make sure that sub-aggregators
* get the {@link AdaptingAggregator} aggregator itself as the parent.
*/
public AggregatorFactories fixParent(Aggregator fixedParent) {
AggregatorFactories previous = this;
return new AggregatorFactories(factories) {
@Override
public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent, CardinalityUpperBound cardinality)
throws IOException {
// Note that we're throwing out the "parent" passed in to this method and using the parent passed to fixParent
return previous.createSubAggregators(searchContext, fixedParent, cardinality);
}
};
}

/**
* A mutable collection of {@link AggregationBuilder}s and
* {@link PipelineAggregationBuilder}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path)
public BucketComparator bucketComparator(String key, SortOrder order) {
throw new UnsupportedOperationException("Can't sort on deferred aggregations");
}

@Override
public Aggregator[] subAggregators() {
return in.subAggregators();
}
}

}
Loading

0 comments on commit 7ceed13

Please sign in to comment.