From 96d5ee7e6674ea6954f8282e37cfd71faa89b2f9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 24 Jul 2019 13:08:15 -0400 Subject: [PATCH] Do not load global checkpoint to ReplicationTracker in local recovery step (#44781) If we force allocate an empty or stale primary, the global checkpoint on replicas might be higher than the primary's as the local recovery step (introduced in #43463) loads the previous (stale) global checkpoint into ReplicationTracker. There's no issue with the retention leases for a new lease with a higher term will supersede the stale one. Relates #43463 --- .../elasticsearch/index/shard/IndexShard.java | 36 +++++++++------- .../indices/recovery/IndexRecoveryIT.java | 43 ++++++++++++++++--- .../PeerRecoveryTargetServiceTests.java | 19 ++++++-- 3 files changed, 74 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 18db78789c914..ca59db776a219 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -171,6 +171,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -1407,7 +1408,7 @@ public long recoverLocallyUpToGlobalCheckpoint() { recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count return recoveredOps; }; - innerOpenEngineAndTranslog(); + innerOpenEngineAndTranslog(() -> globalCheckpoint); getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); } finally { @@ -1533,6 +1534,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat return opsRecovered; } + private void loadGlobalCheckpointToReplicationTracker() throws IOException { + // we have to set it before we open an engine and recover from the translog because + // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, + // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); + } + /** * opens the engine on top of the existing lucene engine and translog. * Operations from the translog will be replayed to bring lucene up to date. @@ -1548,7 +1558,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException { return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, translogRecoveryStats::incrementRecoveredOperations); }; - innerOpenEngineAndTranslog(); + loadGlobalCheckpointToReplicationTracker(); + innerOpenEngineAndTranslog(replicationTracker); getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } @@ -1559,25 +1570,20 @@ public void openEngineAndRecoverFromTranslog() throws IOException { public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]"; - innerOpenEngineAndTranslog(); + loadGlobalCheckpointToReplicationTracker(); + innerOpenEngineAndTranslog(replicationTracker); getEngine().skipTranslogRecovery(); } - private void innerOpenEngineAndTranslog() throws IOException { + private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } - final EngineConfig config = newEngineConfig(); + final EngineConfig config = newEngineConfig(globalCheckpointSupplier); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); - // we have to set it before we open an engine and recover from the translog because - // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, - // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); - replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); updateRetentionLeasesOnReplica(loadRetentionLeases()); assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty() : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() @@ -2646,7 +2652,7 @@ private DocumentMapperForType docMapper(String type) { mapperService.resolveDocumentType(type)); } - private EngineConfig newEngineConfig() { + private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { Sort indexSort = indexSortSupplier.get(); return new EngineConfig(shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), @@ -2656,7 +2662,7 @@ private EngineConfig newEngineConfig() { IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), - indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases, + indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); } @@ -3293,7 +3299,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED // we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. final Engine readOnlyEngine = - new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity()) { + new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { synchronized (mutex) { @@ -3322,7 +3328,7 @@ public void close() throws IOException { } }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); - newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig())); + newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 2f9b67311d52b..0e57ecf35bcc2 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -51,6 +52,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.analysis.AbstractTokenFilterFactory; import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.index.engine.Engine; @@ -76,6 +78,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.engine.MockEngineSupport; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.StubbableTransport; @@ -84,7 +87,6 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.junit.After; import java.io.IOException; import java.util.ArrayList; @@ -137,12 +139,16 @@ protected Collection> nodePlugins() { MockFSIndexStore.TestPlugin.class, RecoverySettingsChunkSizePlugin.class, TestAnalysisPlugin.class, - InternalSettingsPlugin.class); + InternalSettingsPlugin.class, + MockEngineFactoryPlugin.class); } - @After - public void assertConsistentHistoryInLuceneIndex() throws Exception { + @Override + protected void beforeIndexDeletion() throws Exception { + super.beforeIndexDeletion(); internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); + internalCluster().assertSeqNos(); + internalCluster().assertSameDocIdsOnShards(); } private void assertRecoveryStateWithoutStage(RecoveryState state, int shardId, RecoverySource recoverySource, boolean primary, @@ -1049,7 +1055,8 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) { if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) { assertThat("total recovered translog operations must include both local and remote recovery", - recoveryState.getTranslog().recoveredOperations(), equalTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit))); + recoveryState.getTranslog().recoveredOperations(), + greaterThanOrEqualTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit))); } } for (String node : nodes) { @@ -1116,4 +1123,30 @@ public void testRepeatedRecovery() throws Exception { ensureGreen(indexName); } + public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception { + internalCluster().startMasterOnlyNode(Settings.EMPTY); + final List dataNodes = internalCluster().startDataOnlyNodes(2); + final Settings randomNodeDataPathSettings = internalCluster().dataPathSettings(randomFrom(dataNodes)); + final String indexName = "test"; + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), randomBoolean())).get()); + final List indexRequests = IntStream.range(0, between(10, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "type").setSource("foo", "bar")) + .collect(Collectors.toList()); + indexRandom(randomBoolean(), true, true, indexRequests); + ensureGreen(); + internalCluster().stopRandomDataNode(); + internalCluster().stopRandomDataNode(); + final String nodeWithoutData = internalCluster().startDataOnlyNode(); + assertAcked(client().admin().cluster().prepareReroute() + .add(new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeWithoutData, true)).get()); + internalCluster().startDataOnlyNode(randomNodeDataPathSettings); + ensureGreen(); + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) { + assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index e3d299067910f..4a0e893cdc051 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -161,6 +161,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); closeShards(shard); // good copy @@ -168,15 +169,23 @@ public void testPrepareIndexForPeerRecovery() throws Exception { long globalCheckpoint = populateData.apply(shard); Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); assertTrue(safeCommit.isPresent()); + int expectedTotalLocal = 0; + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromMinSeqNo(safeCommit.get().localCheckpoint + 1)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + if (op.seqNo() <= globalCheckpoint) { + expectedTotalLocal++; + } + } + } IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1)); - assertThat(replica.recoveryState().getTranslog().totalLocal(), - equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint))); - assertThat(replica.recoveryState().getTranslog().recoveredOperations(), - equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint))); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal)); + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal)); + assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); closeShards(replica); // corrupted copy @@ -192,6 +201,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); closeShards(replica); // copy with truncated translog @@ -213,6 +223,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); } assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); closeShards(replica); } }