From 0e6bea90eb859384e9f5794b35c6ffe1954b8a88 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 3 Feb 2022 16:03:03 -1000 Subject: [PATCH 1/5] TSDB: Add time series aggs cancellation Adds support for low-level cancelling time-series based aggregations before they reach the reduce phase. Relates to #74660 --- .../search/SearchCancellationIT.java | 132 +++++++++++++++++- .../search/aggregations/AggregationPhase.java | 35 ++++- .../timeseries/TimeSeriesIndexSearcher.java | 49 ++++++- .../search/internal/CancellableScorer.java | 76 ++++++++++ .../search/query/QueryPhase.java | 2 +- .../aggregations/AggregatorTestCase.java | 4 +- 6 files changed, 289 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java index 9a800c2656c45..5b9177bb315fc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -28,6 +29,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.rest.RestStatus; @@ -36,6 +39,7 @@ import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; +import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder; import org.elasticsearch.search.lookup.LeafStoredFieldsLookup; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; @@ -43,6 +47,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.TransportService; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -55,9 +60,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.elasticsearch.index.IndexSettings.TIME_SERIES_END_TIME; +import static org.elasticsearch.index.IndexSettings.TIME_SERIES_START_TIME; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.scriptQuery; import static org.elasticsearch.search.SearchCancellationIT.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; @@ -69,6 +77,8 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) public class SearchCancellationIT extends ESIntegTestCase { + boolean lowLevelCancellation = randomBoolean(); + @Override protected Collection> nodePlugins() { return Collections.singleton(ScriptedBlockPlugin.class); @@ -76,7 +86,6 @@ protected Collection> nodePlugins() { @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { - boolean lowLevelCancellation = randomBoolean(); logger.info("Using lowLevelCancellation: {}", lowLevelCancellation); return Settings.builder() .put(super.nodeSettings(nodeOrdinal, otherSettings)) @@ -227,7 +236,12 @@ public void testCancellationDuringAggregation() throws Exception { new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap()) ) .reduceScript( - new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.REDUCE_SCRIPT_NAME, Collections.emptyMap()) + new Script( + ScriptType.INLINE, + "mockscript", + ScriptedBlockPlugin.REDUCE_BLOCK_SCRIPT_NAME, + Collections.emptyMap() + ) ) ) ) @@ -238,6 +252,89 @@ public void testCancellationDuringAggregation() throws Exception { ensureSearchWasCancelled(searchResponse); } + public void testCancellationDuringTimeSeriesAggregation() throws Exception { + List plugins = initBlockFactory(); + boolean blockInReduce = false; + int numberOfShards = between(2, 5); + long now = Instant.now().toEpochMilli(); + assertAcked( + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name()) + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim") + .put(TIME_SERIES_START_TIME.getKey(), now) + .put(TIME_SERIES_END_TIME.getKey(), now + 101) + .build() + ).setMapping(""" + { + "properties": { + "@timestamp": {"type": "date", "format": "epoch_millis"}, + "dim": {"type": "keyword", "time_series_dimension": true} + } + } + """) + ); + + for (int i = 0; i < 5; i++) { + // Make sure we have a few segments + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int j = 0; j < 20; j++) { + bulkRequestBuilder.add( + client().prepareIndex("test") + .setOpType(DocWriteRequest.OpType.CREATE) + .setSource("@timestamp", now + i * 20 + j, "val", (double) j, "dim", String.valueOf(i)) + ); + } + assertNoFailures(bulkRequestBuilder.get()); + } + + logger.info("Executing search"); + TimeSeriesAggregationBuilder timeSeriesAggregationBuilder = new TimeSeriesAggregationBuilder("test_agg"); + ActionFuture searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .addAggregation( + timeSeriesAggregationBuilder.subAggregation( + new ScriptedMetricAggregationBuilder("sub_agg").initScript( + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.INIT_SCRIPT_NAME, Collections.emptyMap()) + ) + .mapScript( + new Script( + ScriptType.INLINE, + "mockscript", + blockInReduce ? ScriptedBlockPlugin.MAP_SCRIPT_NAME : ScriptedBlockPlugin.MAP_BLOCK_SCRIPT_NAME, + Collections.emptyMap() + ) + ) + .combineScript( + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap()) + ) + .reduceScript( + new Script( + ScriptType.INLINE, + "mockscript", + blockInReduce ? ScriptedBlockPlugin.REDUCE_BLOCK_SCRIPT_NAME : ScriptedBlockPlugin.REDUCE_FAIL_SCRIPT_NAME, + Collections.emptyMap() + ) + ) + ) + ) + .execute(); + awaitForBlock(plugins); + cancelSearch(SearchAction.NAME); + disableBlocks(plugins); + + SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, searchResponse::actionGet); + assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST)); + logger.info("All shards failed with", ex); + if (lowLevelCancellation) { + // Ensure that we cancelled in LeafWalker and not in reduce phase + assertThat(ExceptionsHelper.stackTrace(ex), containsString("LeafWalker")); + } + + } + public void testCancellationOfScrollSearches() throws Exception { List plugins = initBlockFactory(); @@ -414,8 +511,11 @@ public static class ScriptedBlockPlugin extends MockScriptPlugin { static final String SEARCH_BLOCK_SCRIPT_NAME = "search_block"; static final String INIT_SCRIPT_NAME = "init"; static final String MAP_SCRIPT_NAME = "map"; + static final String MAP_BLOCK_SCRIPT_NAME = "map_block"; static final String COMBINE_SCRIPT_NAME = "combine"; static final String REDUCE_SCRIPT_NAME = "reduce"; + static final String REDUCE_FAIL_SCRIPT_NAME = "reduce_fail"; + static final String REDUCE_BLOCK_SCRIPT_NAME = "reduce_block"; static final String TERM_SCRIPT_NAME = "term"; private final AtomicInteger hits = new AtomicInteger(); @@ -449,10 +549,16 @@ public Map, Object>> pluginScripts() { this::nullScript, MAP_SCRIPT_NAME, this::nullScript, + MAP_BLOCK_SCRIPT_NAME, + this::mapBlockScript, COMBINE_SCRIPT_NAME, this::nullScript, - REDUCE_SCRIPT_NAME, + REDUCE_BLOCK_SCRIPT_NAME, this::blockScript, + REDUCE_SCRIPT_NAME, + this::termScript, + REDUCE_FAIL_SCRIPT_NAME, + this::reduceFailScript, TERM_SCRIPT_NAME, this::termScript ); @@ -474,6 +580,11 @@ private Object searchBlockScript(Map params) { return true; } + private Object reduceFailScript(Map params) { + fail("Shouldn't reach reduce"); + return true; + } + private Object nullScript(Map params) { return null; } @@ -493,6 +604,21 @@ private Object blockScript(Map params) { return 42; } + private Object mapBlockScript(Map params) { + final Runnable runnable = beforeExecution.get(); + if (runnable != null) { + runnable.run(); + } + LogManager.getLogger(SearchCancellationIT.class).info("Blocking in map"); + hits.incrementAndGet(); + try { + assertBusy(() -> assertFalse(shouldBlock.get())); + } catch (Exception e) { + throw new RuntimeException(e); + } + return 1; + } + private Object termScript(Map params) { return 1; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index ffcc971eeda7a..ce28ab0499d54 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -8,11 +8,14 @@ package org.elasticsearch.search.aggregations; import org.apache.lucene.search.Collector; +import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.timeseries.TimeSeriesIndexSearcher; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.profile.query.CollectorResult; import org.elasticsearch.search.profile.query.InternalProfileCollector; +import org.elasticsearch.search.query.QueryPhase; import java.io.IOException; import java.util.ArrayList; @@ -40,7 +43,7 @@ public void preProcess(SearchContext context) { } if (context.aggregations().factories().context() != null && context.aggregations().factories().context().isInSortOrderExecutionRequired()) { - TimeSeriesIndexSearcher searcher = new TimeSeriesIndexSearcher(context.searcher()); + TimeSeriesIndexSearcher searcher = new TimeSeriesIndexSearcher(context.searcher(), getCancellationChecks(context)); try { searcher.search(context.rewrittenQuery(), bucketCollector); } catch (IOException e) { @@ -55,6 +58,36 @@ public void preProcess(SearchContext context) { } } + private List getCancellationChecks(SearchContext context) { + List cancellationChecks = new ArrayList<>(); + if (context.lowLevelCancellation()) { + // This searching doesn't live beyond this phase, so we don't need to remove query cancellation + cancellationChecks.add(() -> { + final SearchShardTask task = context.getTask(); + if (task != null) { + task.ensureNotCancelled(); + } + }); + } + + boolean timeoutSet = context.scrollContext() == null + && context.timeout() != null + && context.timeout().equals(SearchService.NO_TIMEOUT) == false; + + if (timeoutSet) { + final long startTime = context.getRelativeTimeInMillis(); + final long timeout = context.timeout().millis(); + final long maxTime = startTime + timeout; + cancellationChecks.add(() -> { + final long time = context.getRelativeTimeInMillis(); + if (time > maxTime) { + throw new QueryPhase.TimeExceededException(); + } + }); + } + return cancellationChecks; + } + public void execute(SearchContext context) { if (context.aggregations() == null) { context.queryResult().aggregations(null); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java index 327da1340ec04..a66542e6141ce 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java @@ -12,7 +12,9 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.BulkScorer; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Explanation; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Query; @@ -25,8 +27,10 @@ import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.internal.CancellableScorer; import java.io.IOException; +import java.util.List; /** * An IndexSearcher wrapper that executes the searches in time-series indices by traversing them by tsid and timestamp @@ -37,14 +41,16 @@ public class TimeSeriesIndexSearcher { // We need to delegate to the other searcher here as opposed to extending IndexSearcher and inheriting default implementations as the // IndexSearcher would most of the time be a ContextIndexSearcher that has important logic related to e.g. document-level security. private final IndexSearcher searcher; + private final List cancellations; - public TimeSeriesIndexSearcher(IndexSearcher searcher) { + public TimeSeriesIndexSearcher(IndexSearcher searcher, List cancellations) { this.searcher = searcher; + this.cancellations = cancellations; } public void search(Query query, BucketCollector bucketCollector) throws IOException { query = searcher.rewrite(query); - Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1); + Weight weight = wrapWeight(searcher.createWeight(query, bucketCollector.scoreMode(), 1)); PriorityQueue queue = new PriorityQueue<>(searcher.getIndexReader().leaves().size()) { @Override protected boolean lessThan(LeafWalker a, LeafWalker b) { @@ -121,4 +127,43 @@ boolean next() throws IOException { return false; } } + + private void checkCancelled() { + for (Runnable r : cancellations) { + r.run(); + } + } + + private Weight wrapWeight(Weight weight) { + if (cancellations.isEmpty() == false) { + return new Weight(weight.getQuery()) { + @Override + public Explanation explain(LeafReaderContext context, int doc) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + throw new UnsupportedOperationException(); + } + + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + Scorer scorer = weight.scorer(context); + if (scorer != null) { + return new CancellableScorer(scorer, () -> checkCancelled()); + } else { + return null; + } + } + + @Override + public BulkScorer bulkScorer(LeafReaderContext context) throws IOException { + return weight.bulkScorer(context); + } + }; + } else { + return weight; + } + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java b/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java new file mode 100644 index 0000000000000..44c3ea7f6a4fb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.internal; + +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Scorer; + +import java.io.IOException; +import java.util.Objects; + +/** + * A wrapper around scorer that executes checkCancelled on each document access. + * + * The main purpose of this class is to allow cancellation of search requests. Note that this class doesn't wrap bulk scorer, for that + * use {@link CancellableBulkScorer} instead. + */ +public class CancellableScorer extends Scorer { + private final Scorer scorer; + private final Runnable checkCancelled; + + public CancellableScorer(Scorer scorer, Runnable checkCancelled) { + super(scorer.getWeight()); + this.scorer = Objects.requireNonNull(scorer); + this.checkCancelled = Objects.requireNonNull(checkCancelled); + } + + @Override + public float score() throws IOException { + return scorer.score(); + } + + @Override + public int docID() { + return scorer.docID(); + } + + @Override + public DocIdSetIterator iterator() { + DocIdSetIterator iterator = scorer.iterator(); + return new DocIdSetIterator() { + @Override + public int docID() { + return iterator.docID(); + } + + @Override + public int nextDoc() throws IOException { + checkCancelled.run(); + return iterator.nextDoc(); + } + + @Override + public int advance(int target) throws IOException { + checkCancelled.run(); + return iterator.advance(target); + } + + @Override + public long cost() { + return iterator.cost(); + } + }; + } + + @Override + public float getMaxScore(int upTo) throws IOException { + checkCancelled.run(); + return scorer.getMaxScore(upTo); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 0b72df78a510f..937378719ff81 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -267,5 +267,5 @@ private static boolean canEarlyTerminate(IndexReader reader, SortAndFormats sort return true; } - static class TimeExceededException extends RuntimeException {} + public static class TimeExceededException extends RuntimeException {} } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 36062d69ca401..9b5b6a4bcd002 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -577,7 +577,7 @@ private A searchAndReduce( C a = createAggregator(builder, context); a.preCollection(); if (context.isInSortOrderExecutionRequired()) { - new TimeSeriesIndexSearcher(subSearcher).search(rewritten, a); + new TimeSeriesIndexSearcher(subSearcher, List.of()).search(rewritten, a); } else { Weight weight = subSearcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f); subSearcher.search(weight, a); @@ -588,7 +588,7 @@ private A searchAndReduce( } else { root.preCollection(); if (context.isInSortOrderExecutionRequired()) { - new TimeSeriesIndexSearcher(searcher).search(rewritten, MultiBucketCollector.wrap(true, List.of(root))); + new TimeSeriesIndexSearcher(searcher, List.of()).search(rewritten, MultiBucketCollector.wrap(true, List.of(root))); } else { searcher.search(rewritten, MultiBucketCollector.wrap(true, List.of(root))); } From 025dc5ed6905f0fdb22d2bd5acb8fc0f4124d904 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 7 Feb 2022 12:19:40 -1000 Subject: [PATCH 2/5] Fix lowLevelCancellation flag initialization --- .../search/SearchCancellationIT.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java index 5b9177bb315fc..e78ba8b7a0469 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.TransportService; +import org.junit.BeforeClass; import java.time.Instant; import java.util.ArrayList; @@ -77,7 +78,12 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) public class SearchCancellationIT extends ESIntegTestCase { - boolean lowLevelCancellation = randomBoolean(); + private static boolean lowLevelCancellation; + + @BeforeClass + public static void init() { + lowLevelCancellation = randomBoolean(); + } @Override protected Collection> nodePlugins() { @@ -594,7 +600,9 @@ private Object blockScript(Map params) { if (runnable != null) { runnable.run(); } - LogManager.getLogger(SearchCancellationIT.class).info("Blocking in reduce"); + if (shouldBlock.get()) { + LogManager.getLogger(SearchCancellationIT.class).info("Blocking in reduce"); + } hits.incrementAndGet(); try { assertBusy(() -> assertFalse(shouldBlock.get())); @@ -609,7 +617,9 @@ private Object mapBlockScript(Map params) { if (runnable != null) { runnable.run(); } - LogManager.getLogger(SearchCancellationIT.class).info("Blocking in map"); + if (shouldBlock.get()) { + LogManager.getLogger(SearchCancellationIT.class).info("Blocking in map"); + } hits.incrementAndGet(); try { assertBusy(() -> assertFalse(shouldBlock.get())); From 403faf491c6b1c306c97128fe5e3c4866aee1e9c Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 9 Feb 2022 17:28:54 -1000 Subject: [PATCH 3/5] Don't check for cancellation on every record --- .../elasticsearch/search/SearchCancellationIT.java | 12 ++++++++---- .../search/internal/CancellableScorer.java | 11 +++++++++-- .../timeseries/TimeSeriesIndexSearcherTests.java | 5 +---- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java index e78ba8b7a0469..6383e6d169432 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java @@ -263,6 +263,10 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { boolean blockInReduce = false; int numberOfShards = between(2, 5); long now = Instant.now().toEpochMilli(); + int numberOfRefreshes = between(2, 5); + // need to make sure we hit the low level check that happens only every 1024 docs, so we need to make sure that we have at + // least 1024 docs on the shard that we are blocked on otherwise we might never hit the low level cancellation there. + int numberOfDocsPerRefresh = numberOfShards * between(1500, 2000); assertAcked( prepareCreate("test").setSettings( Settings.builder() @@ -271,7 +275,7 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name()) .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim") .put(TIME_SERIES_START_TIME.getKey(), now) - .put(TIME_SERIES_END_TIME.getKey(), now + 101) + .put(TIME_SERIES_END_TIME.getKey(), now + (long) numberOfRefreshes * numberOfDocsPerRefresh + 1) .build() ).setMapping(""" { @@ -283,14 +287,14 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { """) ); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < numberOfRefreshes; i++) { // Make sure we have a few segments BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int j = 0; j < 20; j++) { + for (int j = 0; j < numberOfDocsPerRefresh; j++) { bulkRequestBuilder.add( client().prepareIndex("test") .setOpType(DocWriteRequest.OpType.CREATE) - .setSource("@timestamp", now + i * 20 + j, "val", (double) j, "dim", String.valueOf(i)) + .setSource("@timestamp", now + (long) i * numberOfDocsPerRefresh + j, "val", (double) j, "dim", String.valueOf(i)) ); } assertNoFailures(bulkRequestBuilder.get()); diff --git a/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java b/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java index 44c3ea7f6a4fb..9fbb8d4c6bce1 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java +++ b/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java @@ -21,8 +21,11 @@ * use {@link CancellableBulkScorer} instead. */ public class CancellableScorer extends Scorer { + private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; + private final Scorer scorer; private final Runnable checkCancelled; + private int seen = 0; public CancellableScorer(Scorer scorer, Runnable checkCancelled) { super(scorer.getWeight()); @@ -51,13 +54,17 @@ public int docID() { @Override public int nextDoc() throws IOException { - checkCancelled.run(); + if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) { + checkCancelled.run(); + } return iterator.nextDoc(); } @Override public int advance(int target) throws IOException { - checkCancelled.run(); + if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) { + checkCancelled.run(); + } return iterator.advance(target); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java index 946109cb6a22e..9adb6ef1e0c63 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java @@ -88,10 +88,7 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx IndexReader reader = DirectoryReader.open(dir); IndexSearcher searcher = new IndexSearcher(reader); - TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of() - - - ); + TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); BucketCollector collector = new BucketCollector() { From c15b2f4d728c1648c0d24eedac336d481c245beb Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 10 Feb 2022 10:34:17 -1000 Subject: [PATCH 4/5] Address review comment --- .../aggregations/timeseries/TimeSeriesIndexSearcher.java | 4 ++-- .../org/elasticsearch/search/internal/CancellableScorer.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java index 56af7f7ef1b32..9bed9fdb9e627 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java @@ -146,12 +146,12 @@ private Weight wrapWeight(Weight weight) { return new Weight(weight.getQuery()) { @Override public Explanation explain(LeafReaderContext context, int doc) throws IOException { - throw new UnsupportedOperationException(); + return weight.explain(context, doc); } @Override public boolean isCacheable(LeafReaderContext ctx) { - throw new UnsupportedOperationException(); + return weight.isCacheable(ctx); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java b/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java index 9fbb8d4c6bce1..7232b762eb904 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java +++ b/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java @@ -18,7 +18,8 @@ * A wrapper around scorer that executes checkCancelled on each document access. * * The main purpose of this class is to allow cancellation of search requests. Note that this class doesn't wrap bulk scorer, for that - * use {@link CancellableBulkScorer} instead. + * use {@link CancellableBulkScorer} instead. We have to extend Scorer here instead of extending a much more convenient FilterScorer + * because we need to override some methods that are marked as final there. */ public class CancellableScorer extends Scorer { private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; From e3417ebdb5a575a3198de397b0b1ba8de9e8a9a5 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 14 Feb 2022 11:11:16 -1000 Subject: [PATCH 5/5] Simplify the cancellation code and clean up the tests --- .../search/SearchCancellationIT.java | 27 +--- .../timeseries/TimeSeriesIndexSearcher.java | 46 ++----- .../search/internal/CancellableScorer.java | 84 ------------ .../TimeSeriesCancellationTests.java | 128 ++++++++++++++++++ 4 files changed, 144 insertions(+), 141 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java create mode 100644 server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesCancellationTests.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java index 6383e6d169432..465c394403bef 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java @@ -260,13 +260,10 @@ public void testCancellationDuringAggregation() throws Exception { public void testCancellationDuringTimeSeriesAggregation() throws Exception { List plugins = initBlockFactory(); - boolean blockInReduce = false; int numberOfShards = between(2, 5); long now = Instant.now().toEpochMilli(); - int numberOfRefreshes = between(2, 5); - // need to make sure we hit the low level check that happens only every 1024 docs, so we need to make sure that we have at - // least 1024 docs on the shard that we are blocked on otherwise we might never hit the low level cancellation there. - int numberOfDocsPerRefresh = numberOfShards * between(1500, 2000); + int numberOfRefreshes = between(1, 5); + int numberOfDocsPerRefresh = numberOfShards * between(1500, 2000) / numberOfRefreshes; assertAcked( prepareCreate("test").setSettings( Settings.builder() @@ -288,7 +285,7 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { ); for (int i = 0; i < numberOfRefreshes; i++) { - // Make sure we have a few segments + // Make sure we sometimes have a few segments BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int j = 0; j < numberOfDocsPerRefresh; j++) { bulkRequestBuilder.add( @@ -310,23 +307,13 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.INIT_SCRIPT_NAME, Collections.emptyMap()) ) .mapScript( - new Script( - ScriptType.INLINE, - "mockscript", - blockInReduce ? ScriptedBlockPlugin.MAP_SCRIPT_NAME : ScriptedBlockPlugin.MAP_BLOCK_SCRIPT_NAME, - Collections.emptyMap() - ) + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.MAP_BLOCK_SCRIPT_NAME, Collections.emptyMap()) ) .combineScript( new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap()) ) .reduceScript( - new Script( - ScriptType.INLINE, - "mockscript", - blockInReduce ? ScriptedBlockPlugin.REDUCE_BLOCK_SCRIPT_NAME : ScriptedBlockPlugin.REDUCE_FAIL_SCRIPT_NAME, - Collections.emptyMap() - ) + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.REDUCE_FAIL_SCRIPT_NAME, Collections.emptyMap()) ) ) ) @@ -339,8 +326,8 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST)); logger.info("All shards failed with", ex); if (lowLevelCancellation) { - // Ensure that we cancelled in LeafWalker and not in reduce phase - assertThat(ExceptionsHelper.stackTrace(ex), containsString("LeafWalker")); + // Ensure that we cancelled in TimeSeriesIndexSearcher and not in reduce phase + assertThat(ExceptionsHelper.stackTrace(ex), containsString("TimeSeriesIndexSearcher")); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java index 0efabc290dac0..71ccf96fd6bc2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java @@ -12,9 +12,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.search.BulkScorer; import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.Explanation; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Query; @@ -28,7 +26,6 @@ import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.internal.CancellableScorer; import java.io.IOException; import java.util.ArrayList; @@ -40,6 +37,7 @@ * TODO: Convert it to use index sort instead of hard-coded tsid and timestamp values */ public class TimeSeriesIndexSearcher { + private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; // We need to delegate to the other searcher here as opposed to extending IndexSearcher and inheriting default implementations as the // IndexSearcher would most of the time be a ContextIndexSearcher that has important logic related to e.g. document-level security. @@ -52,12 +50,16 @@ public TimeSeriesIndexSearcher(IndexSearcher searcher, List cancellati } public void search(Query query, BucketCollector bucketCollector) throws IOException { + int seen = 0; query = searcher.rewrite(query); - Weight weight = wrapWeight(searcher.createWeight(query, bucketCollector.scoreMode(), 1)); + Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1); // Create LeafWalker for each subreader List leafWalkers = new ArrayList<>(); for (LeafReaderContext leaf : searcher.getIndexReader().leaves()) { + if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) { + checkCancelled(); + } LeafBucketCollector leafCollector = bucketCollector.getLeafCollector(leaf); Scorer scorer = weight.scorer(leaf); if (scorer != null) { @@ -81,6 +83,9 @@ protected boolean lessThan(LeafWalker a, LeafWalker b) { // walkers are ordered by timestamp. while (populateQueue(leafWalkers, queue)) { do { + if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) { + checkCancelled(); + } LeafWalker walker = queue.top(); walker.collectCurrent(); if (walker.nextDoc() == DocIdSetIterator.NO_MORE_DOCS || walker.shouldPop()) { @@ -142,39 +147,6 @@ private void checkCancelled() { } } - private Weight wrapWeight(Weight weight) { - if (cancellations.isEmpty() == false) { - return new Weight(weight.getQuery()) { - @Override - public Explanation explain(LeafReaderContext context, int doc) throws IOException { - return weight.explain(context, doc); - } - - @Override - public boolean isCacheable(LeafReaderContext ctx) { - return weight.isCacheable(ctx); - } - - @Override - public Scorer scorer(LeafReaderContext context) throws IOException { - Scorer scorer = weight.scorer(context); - if (scorer != null) { - return new CancellableScorer(scorer, () -> checkCancelled()); - } else { - return null; - } - } - - @Override - public BulkScorer bulkScorer(LeafReaderContext context) throws IOException { - return weight.bulkScorer(context); - } - }; - } else { - return weight; - } - } - private static class LeafWalker { private final LeafCollector collector; private final Bits liveDocs; diff --git a/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java b/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java deleted file mode 100644 index 7232b762eb904..0000000000000 --- a/server/src/main/java/org/elasticsearch/search/internal/CancellableScorer.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.search.internal; - -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.Scorer; - -import java.io.IOException; -import java.util.Objects; - -/** - * A wrapper around scorer that executes checkCancelled on each document access. - * - * The main purpose of this class is to allow cancellation of search requests. Note that this class doesn't wrap bulk scorer, for that - * use {@link CancellableBulkScorer} instead. We have to extend Scorer here instead of extending a much more convenient FilterScorer - * because we need to override some methods that are marked as final there. - */ -public class CancellableScorer extends Scorer { - private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; - - private final Scorer scorer; - private final Runnable checkCancelled; - private int seen = 0; - - public CancellableScorer(Scorer scorer, Runnable checkCancelled) { - super(scorer.getWeight()); - this.scorer = Objects.requireNonNull(scorer); - this.checkCancelled = Objects.requireNonNull(checkCancelled); - } - - @Override - public float score() throws IOException { - return scorer.score(); - } - - @Override - public int docID() { - return scorer.docID(); - } - - @Override - public DocIdSetIterator iterator() { - DocIdSetIterator iterator = scorer.iterator(); - return new DocIdSetIterator() { - @Override - public int docID() { - return iterator.docID(); - } - - @Override - public int nextDoc() throws IOException { - if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) { - checkCancelled.run(); - } - return iterator.nextDoc(); - } - - @Override - public int advance(int target) throws IOException { - if (++seen % CHECK_CANCELLED_SCORER_INTERVAL == 0) { - checkCancelled.run(); - } - return iterator.advance(target); - } - - @Override - public long cost() { - return iterator.cost(); - } - }; - } - - @Override - public float getMaxScore(int upTo) throws IOException { - checkCancelled.run(); - return scorer.getMaxScore(upTo); - } -} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesCancellationTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesCancellationTests.java new file mode 100644 index 0000000000000..b66db7736a7ff --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesCancellationTests.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.search.aggregations.timeseries; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.test.ESTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; + +public class TimeSeriesCancellationTests extends ESTestCase { + + private static Directory dir; + private static IndexReader reader; + + @BeforeClass + public static void setup() throws IOException { + dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setIndexSort( + new Sort( + new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING), + new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG) + ) + ); + RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc); + indexRandomDocuments(iw, randomIntBetween(2048, 4096)); + iw.flush(); + reader = iw.getReader(); + iw.close(); + } + + private static void indexRandomDocuments(RandomIndexWriter w, int numDocs) throws IOException { + for (int i = 1; i <= numDocs; ++i) { + Document doc = new Document(); + String tsid = "tsid" + randomIntBetween(0, 30); + long time = randomNonNegativeLong(); + doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); + doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, time)); + w.addDocument(doc); + } + } + + @AfterClass + public static void cleanup() throws IOException { + IOUtils.close(reader, dir); + dir = null; + reader = null; + } + + public void testLowLevelCancellationActions() throws IOException { + ContextIndexSearcher searcher = new ContextIndexSearcher( + reader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + true + ); + TimeSeriesIndexSearcher timeSeriesIndexSearcher = new TimeSeriesIndexSearcher( + searcher, + List.of(() -> { throw new TaskCancelledException("Cancel"); }) + ); + CountingBucketCollector bc = new CountingBucketCollector(); + expectThrows(TaskCancelledException.class, () -> timeSeriesIndexSearcher.search(new MatchAllDocsQuery(), bc)); + // We count every segment and every record as 1 and break on 2048th iteration counting from 0 + // so we expect to see 2048 - number_of_segments - 1 (-1 is because we check before we collect) + assertThat(bc.count.get(), equalTo(Math.max(0, 2048 - reader.leaves().size() - 1))); + } + + public static class CountingBucketCollector extends BucketCollector { + public AtomicInteger count = new AtomicInteger(); + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + return new LeafBucketCollector() { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + count.incrementAndGet(); + } + }; + } + + @Override + public void preCollection() throws IOException { + + } + + @Override + public void postCollection() throws IOException { + + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; + } + } +}