Skip to content

Commit

Permalink
Peer recovery should flush at the end (#41660)
Browse files Browse the repository at this point in the history
Flushing at the end of a peer recovery (if needed) can bring these
benefits:

1. Closing an index won't end up with the red state for a recovering
replica should always be ready for closing whether it performs the
verifying-before-close step or not.

2. Good opportunities to compact store (i.e., flushing and merging
Lucene, and trimming translog)

Closes #40024
Closes #39588
  • Loading branch information
dnhatn authored May 22, 2019
1 parent 5785941 commit 75be2a6
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
Expand Down Expand Up @@ -298,11 +300,19 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> l
// Persist the global checkpoint.
indexShard.sync();
indexShard.persistRetentionLeases();
if (hasUncommittedOperations()) {
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
}
indexShard.finalizeRecovery();
return null;
});
}

private boolean hasUncommittedOperations() throws IOException {
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
return indexShard.estimateNumberOfHistoryOperations("peer-recovery", localCheckpointOfCommit + 1) > 0;
}

@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
indexShard.activateWithPrimaryContext(primaryContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
Expand All @@ -52,9 +53,12 @@
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.flush.SyncedFlushUtil;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
Expand Down Expand Up @@ -84,14 +88,19 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -910,6 +919,39 @@ public void testDoNotInfinitelyWaitForMapping() {
assertHitCount(client().prepareSearch().get(), numDocs);
}

public void testRecoveryFlushReplica() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
String indexName = "test-index";
createIndex(indexName, Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", 1).build());
int numDocs = randomIntBetween(0, 10);
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs)
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.number_of_replicas", 1)));
ensureGreen(indexName);
ShardId shardId = null;
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) {
shardId = shardStats.getShardRouting().shardId();
if (shardStats.getShardRouting().primary() == false) {
assertThat(shardStats.getCommitStats().getNumDocs(), equalTo(numDocs));
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
shardStats.getCommitStats().getUserData().entrySet());
assertThat(commitInfo.localCheckpoint, equalTo(shardStats.getSeqNoStats().getLocalCheckpoint()));
assertThat(commitInfo.maxSeqNo, equalTo(shardStats.getSeqNoStats().getMaxSeqNo()));
}
}
SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(indexName).get().failedShards(), equalTo(0)));
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.number_of_replicas", 2)));
ensureGreen(indexName);
// Recovery should keep syncId if no indexing activity on the primary after synced-flush.
Set<String> syncIds = Stream.of(client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards())
.map(shardStats -> shardStats.getCommitStats().syncId())
.collect(Collectors.toSet());
assertThat(syncIds, hasSize(1));
}

public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin {
final AtomicBoolean throwParsingError = new AtomicBoolean();
@Override
Expand Down

0 comments on commit 75be2a6

Please sign in to comment.