Skip to content

Commit

Permalink
Shard Search Scroll failures consistency (#62061)
Browse files Browse the repository at this point in the history
Today some uncaught shard failures such as RejectedExecutionException skips the release of shard context
and let subsequent scroll requests access the same shard context again. Depending on how the other shards advanced,
this behavior can lead to missing data since scrolls always move forward.
In order to avoid hidden data loss, this commit ensures that we always release the context of shard search scroll requests whenever a failure
occurs locally. The shard search context will no longer exist in subsequent scroll requests which will lead to consistent shard failures
in the responses.
This change also modifies the retry tests of the reindex feature. Reindex retries scroll search request that contains a shard failure and
move on whenever the failure disappears. That is not compatible with how scrolls work and can lead to missing data as explained above.
That means that reindex will now report scroll failures when search rejection happen during the operation instead of skipping document
silently.
Finally this change removes an old TODO that was fulfilled with #61062.
  • Loading branch information
jimczi authored and dnhatn committed Sep 10, 2020
1 parent 4d528e9 commit 3fc35aa
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static org.hamcrest.Matchers.hasSize;

/**
* Integration test for retry behavior. Useful because retrying relies on the way that the
* Integration test for bulk retry behavior. Useful because retrying relies on the way that the
* rest of Elasticsearch throws exceptions and unit tests won't verify that.
*/
public class RetryTests extends ESIntegTestCase {
Expand Down Expand Up @@ -84,7 +84,7 @@ protected Collection<Class<? extends Plugin>> transportClientPlugins() {
}

/**
* Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried.
* Lower the queue sizes to be small enough that bulk will time out and have to be retried.
*/
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand Down Expand Up @@ -152,22 +152,15 @@ private void testCase(
BulkIndexByScrollResponseMatcher matcher)
throws Exception {
/*
* These test cases work by stuffing the search and bulk queues of a single node and
* making sure that we read and write from that node. Because of some "fun" with the
* way that searches work, we need at least one more node to act as the coordinating
* node for the search request. If we didn't do this then the searches would get stuck
* in the queue anyway because we force queue portions of the coordinating node's
* actions. This is not a big deal in normal operations but a real pain when you are
* intentionally stuffing queues hoping for a failure.
* These test cases work by stuffing the bulk queue of a single node and
* making sure that we read and write from that node.
*/

final Settings nodeSettings = Settings.builder()
// use pools of size 1 so we can block them
.put("thread_pool.write.size", 1)
.put("thread_pool.search.size", 1)
// use queues of size 1 because size 0 is broken and because search requests need the queue to function
// use queues of size 1 because size 0 is broken and because bulk requests need the queue to function
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.search.queue_size", 1)
.put("node.attr.color", "blue")
.build();
final String node = internalCluster().startDataOnlyNode(nodeSettings);
Expand All @@ -193,45 +186,25 @@ private void testCase(
assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures());
client().admin().indices().prepareRefresh("source").get();

logger.info("Blocking search");
CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH, node);

AbstractBulkByScrollRequestBuilder<?, ?> builder = request.apply(internalCluster().masterClient());
// Make sure we use more than one batch so we have to scroll
builder.source().setSize(DOC_COUNT / randomIntBetween(2, 10));

logger.info("Blocking bulk so we start to get bulk rejections");
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node);

logger.info("Starting request");
ActionFuture<BulkByScrollResponse> responseListener = builder.execute();

try {
logger.info("Waiting for search rejections on the initial search");
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L)));

logger.info("Blocking bulk and unblocking search so we start to get bulk rejections");
CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node);
initialSearchBlock.await();

logger.info("Waiting for bulk rejections");
assertBusy(() -> assertThat(taskStatus(action).getBulkRetries(), greaterThan(0L)));

// Keep a copy of the current number of search rejections so we can assert that we get more when we block the scroll
long initialSearchRejections = taskStatus(action).getSearchRetries();

logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll");
CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH, node);
bulkBlock.await();

logger.info("Waiting for search rejections for the scroll");
assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(initialSearchRejections)));

logger.info("Unblocking the scroll");
scrollBlock.await();

logger.info("Waiting for the request to finish");
BulkByScrollResponse response = responseListener.get();
assertThat(response, matcher);
assertThat(response.getBulkRetries(), greaterThan(0L));
assertThat(response.getSearchRetries(), greaterThan(initialSearchRejections));
} finally {
// Fetch the response just in case we blew up half way through. This will make sure the failure is thrown up to the top level.
BulkByScrollResponse response = responseListener.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,6 @@
* run separate fetch phases etc.
*/
abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements Runnable {
/*
* Some random TODO:
* Today we still have a dedicated executing mode for scrolls while we could simplify this by implementing
* scroll like functionality (mainly syntactic sugar) as an ordinary search with search_after. We could even go further and
* make the scroll entirely stateless and encode the state per shard in the scroll ID.
*
* Today we also hold a context per shard but maybe
* we want the context per coordinating node such that we route the scroll to the same coordinator all the time and hold the context
* here? This would have the advantage that if we loose that node the entire scroll is deal not just one shard.
*
* Additionally there is the possibility to associate the scroll with a seq. id. such that we can talk to any replica as long as
* the shards engine hasn't advanced that seq. id yet. Such a resume is possible and best effort, it could be even a safety net since
* if you rely on indices being read-only things can change in-between without notification or it's hard to detect if there where any
* changes while scrolling. These are all options to improve the current situation which we can look into down the road
*/
protected final Logger logger;
protected final ActionListener<SearchResponse> listener;
protected final ParsedScrollId scrollId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public abstract class Engine implements Closeable {
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; // TODO: Make source of search enum?
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";

protected final ShardId shardId;
protected final String allocationId;
Expand Down
Loading

0 comments on commit 3fc35aa

Please sign in to comment.