Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce the overhead of timeouts and low-level search cancellation. #25776

Merged
merged 3 commits into from
Jul 19, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.internal;

import org.apache.lucene.search.BulkScorer;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.util.Bits;

import java.io.IOException;
import java.util.Objects;

/**
* A {@link BulkScorer} wrapper that runs a {@link Runnable} on a regular basis
* so that the query can be interrupted.
*/
final class CancellableBulkScorer extends BulkScorer {

// we use the BooleanScorer window size as a base interval in order to make sure that we do not
// slow down boolean queries
private static final int INITIAL_INTERVAL = 1 << 11;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++


// No point in having intervals that are larger than 1M
private static final int MAX_INTERVAL = 1 << 20;

private final BulkScorer scorer;
private final Runnable checkCancelled;

CancellableBulkScorer(BulkScorer scorer, Runnable checkCancelled) {
this.scorer = Objects.requireNonNull(scorer);
this.checkCancelled = Objects.requireNonNull(checkCancelled);
}

@Override
public int score(LeafCollector collector, Bits acceptDocs, int min, int max) throws IOException {
int interval = INITIAL_INTERVAL;
while (min < max) {
checkCancelled.run();
final int newMax = (int) Math.min((long) min + interval, max);
min = scorer.score(collector, acceptDocs, min, newMax);
interval = Math.min(interval << 1, MAX_INTERVAL);
}
checkCancelled.run();
return min;
}

@Override
public long cost() {
return scorer.cost();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
package org.elasticsearch.search.internal;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.BulkScorer;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.Weight;
import org.elasticsearch.common.lease.Releasable;
Expand All @@ -40,6 +44,8 @@
import org.elasticsearch.search.profile.query.QueryTimingType;

import java.io.IOException;
import java.util.List;
import java.util.Set;

/**
* Context-aware extension of {@link IndexSearcher}.
Expand All @@ -58,6 +64,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
// TODO revisit moving the profiler to inheritance or wrapping model in the future
private QueryProfiler profiler;

private Runnable checkCancelled;

public ContextIndexSearcher(Engine.Searcher searcher,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) {
super(searcher.reader());
Expand All @@ -76,6 +84,14 @@ public void setProfiler(QueryProfiler profiler) {
this.profiler = profiler;
}

/**
* Set a {@link Runnable} that will be run on a regular basis while
* collecting documents.
*/
public void setCheckCancelled(Runnable checkCancelled) {
this.checkCancelled = checkCancelled;
}

public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
this.aggregatedDfs = aggregatedDfs;
}
Expand Down Expand Up @@ -133,6 +149,43 @@ public Weight createWeight(Query query, boolean needsScores, float boost) throws
}
}

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
final Weight cancellableWeight;
if (checkCancelled != null) {
cancellableWeight = new Weight(weight.getQuery()) {

@Override
public void extractTerms(Set<Term> terms) {
throw new UnsupportedOperationException();
}

@Override
public Explanation explain(LeafReaderContext context, int doc) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Scorer scorer(LeafReaderContext context) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
BulkScorer in = weight.bulkScorer(context);
if (in != null) {
return new CancellableBulkScorer(in, checkCancelled);
} else {
return null;
}
}
};
} else {
cancellableWeight = weight;
}
super.search(leaves, cancellableWeight, collector);
}

@Override
public Explanation explain(Query query, int doc) throws IOException {
if (aggregatedDfs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,58 +21,33 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FilterCollector;
import org.apache.lucene.search.FilterLeafCollector;
import org.apache.lucene.search.LeafCollector;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.tasks.TaskCancelledException;

import java.io.IOException;
import java.util.function.BooleanSupplier;

/**
* Collector that checks if the task it is executed under is cancelled.
*/
public class CancellableCollector extends FilterCollector {
private final Provider<Boolean> cancelled;
private final boolean leafLevel;
private final BooleanSupplier cancelled;

/**
* Constructor
* @param cancelled supplier of the cancellation flag, the supplier will be called for each segment if lowLevelCancellation is set
* to false and for each collected record if lowLevelCancellation is set to true. In other words this class assumes
* that the supplier is fast, with performance on the order of a volatile read.
* @param lowLevelCancellation true if collector should check for cancellation for each collected record, false if check should be
* performed only once per segment
* @param cancelled supplier of the cancellation flag, the supplier will be called for each segment
* @param in wrapped collector
*/
public CancellableCollector(Provider<Boolean> cancelled, boolean lowLevelCancellation, Collector in) {
public CancellableCollector(BooleanSupplier cancelled, Collector in) {
super(in);
this.cancelled = cancelled;
this.leafLevel = lowLevelCancellation;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (cancelled.get()) {
if (cancelled.getAsBoolean()) {
throw new TaskCancelledException("cancelled");
}
if (leafLevel) {
return new CancellableLeafCollector(super.getLeafCollector(context));
} else {
return super.getLeafCollector(context);
}
}

private class CancellableLeafCollector extends FilterLeafCollector {
private CancellableLeafCollector(LeafCollector in) {
super(in);
}

@Override
public void collect(int doc) throws IOException {
if (cancelled.get()) {
throw new TaskCancelledException("cancelled");
}
super.collect(doc);
}
return super.getLeafCollector(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Counter;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
import org.elasticsearch.search.profile.query.InternalProfileCollector;
Expand All @@ -50,7 +47,6 @@
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MULTI;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_POST_FILTER;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_TIMEOUT;
import static org.elasticsearch.search.query.TopDocsCollectorContext.shortcutTotalHitCount;

abstract class QueryCollectorContext {
Expand Down Expand Up @@ -170,31 +166,14 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
};
}

/**
* Creates a time limiting collector limiting the collection to <code>timeOutMillis</code>ms.
*/
static QueryCollectorContext createTimeoutCollectorContext(Counter timeEstimate, long timeoutMillis) {
return new QueryCollectorContext(REASON_SEARCH_TIMEOUT) {
@Override
Collector create(Collector in) throws IOException {
return new TimeLimitingCollector(in, timeEstimate, timeoutMillis);
}

@Override
boolean shouldCollect() {
return false;
}
};
}

/**
* Creates a collector that throws {@link TaskCancelledException} if the search is cancelled
*/
static QueryCollectorContext createCancellableCollectorContext(Provider<Boolean> cancelled, boolean lowLevelCancellation) {
static QueryCollectorContext createCancellableCollectorContext(BooleanSupplier cancelled) {
return new QueryCollectorContext(REASON_SEARCH_CANCELLED) {
@Override
Collector create(Collector in) throws IOException {
return new CancellableCollector(cancelled, lowLevelCancellation, in);
return new CancellableCollector(cancelled, in);
}

@Override
Expand Down
Loading