diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index f7d0de08ed65c..cf80120bc44a3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -8,14 +8,28 @@ package org.opensearch.indices.replication; +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.Assert; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexModule; +import org.opensearch.index.engine.Segment; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -44,24 +58,39 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); } - refresh(INDEX_NAME); - // wait a short amount of time to give replication a chance to complete. - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - - final int additionalDocCount = scaledRandomIntBetween(0, 200); - final int totalDocs = initialDocCount + additionalDocCount; - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { - waitForDocs(additionalDocCount, indexer); - } - flush(INDEX_NAME); - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } public void testReplicationAfterForceMerge() throws Exception { @@ -71,29 +100,48 @@ public void testReplicationAfterForceMerge() throws Exception { ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + // This case tests that replicas preserve these files so the local store is not corrupt. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + Thread.sleep(1000); + refresh(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); } - flush(INDEX_NAME); - // wait a short amount of time to give replication a chance to complete. - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - - final int additionalDocCount = scaledRandomIntBetween(0, 200); - final int totalDocs = initialDocCount + additionalDocCount; - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { - waitForDocs(additionalDocCount, indexer); - } - // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. - // This case tests that replicas preserve these files so the local store is not corrupt. - client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } - public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { + public void testReplicaSetupAfterPrimaryIndexesDocs() { final String nodeA = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -122,5 +170,63 @@ public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { ensureGreen(INDEX_NAME); assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertSegmentStats(REPLICA_COUNT); + } + + private void assertSegmentStats(int numberOfReplicas) { + client().admin().indices().segments(new IndicesSegmentsRequest(), new ActionListener<>() { + @Override + public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { + + List segmentsByIndex = indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = Arrays.stream(replicationGroupSegments) + .collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + + // create a map of the primary's segments keyed by segment name, allowing us to compare the same segment found on + // replicas. + final Map primarySegmentsMap = primaryShardSegments.getSegments() + .stream() + .collect(Collectors.toMap(Segment::getName, Function.identity())); + // For every replica, ensure that its segments are in the same state as on the primary. + // It is possible the primary has not cleaned up old segments that are not required on replicas, so we can't do a + // list comparison. + // This equality check includes search/committed properties on the Segment. Combined with docCount checks, + // this ensures the replica has correctly copied the latest segments and has all segments referenced by the latest + // commit point, even if they are not searchable. + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + for (Segment replicaSegment : shardSegment.getSegments()) { + final Segment primarySegment = primarySegmentsMap.get(replicaSegment.getName()); + assertEquals("Replica's segment should be identical to primary's version", replicaSegment, primarySegment); + } + } + } + } + + @Override + public void onFailure(Exception e) { + Assert.fail("Error fetching segment stats"); + } + }); } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 86b3b835c88e5..b3af769dc0ccf 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {} + public void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) throws IOException {} public long getProcessedLocalCheckpoint() { return 0L; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 2c161e511edae..16fa8a80ec3e0 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -90,6 +90,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; @@ -322,11 +323,28 @@ public InternalEngine(EngineConfig engineConfig) { } @Override - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + public synchronized void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) + throws IOException { assert engineConfig.isReadOnly() : "Only replicas should update Infos"; + + store.incRef(); + try { + refreshLastCommittedSegmentInfos(); + // clean up the local store of old segment files + // and validate the latest segment infos against the snapshot sent from the primary shard. + store.cleanupAndVerify( + "finalize - clean with in memory infos", + expectedMetadata, + store.getMetadata(infos), + store.getMetadata(lastCommittedSegmentInfos) + ); + } finally { + store.decRef(); + } + // Update the current infos reference on the Engine's reader. externalReaderManager.internalReaderManager.updateSegments(infos); - externalReaderManager.maybeRefresh(); localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + externalReaderManager.maybeRefresh(); } private LocalCheckpointTracker createLocalCheckpointTracker( diff --git a/server/src/main/java/org/opensearch/index/engine/Segment.java b/server/src/main/java/org/opensearch/index/engine/Segment.java index 4874d0a30196f..a63f39af3462a 100644 --- a/server/src/main/java/org/opensearch/index/engine/Segment.java +++ b/server/src/main/java/org/opensearch/index/engine/Segment.java @@ -178,8 +178,17 @@ public boolean equals(Object o) { Segment segment = (Segment) o; - return Objects.equals(name, segment.name); - + return Objects.equals(name, segment.name) + && Objects.equals(docCount, segment.docCount) + && Objects.equals(delDocCount, segment.delDocCount) + && Objects.equals(sizeInBytes, segment.sizeInBytes) + && Objects.equals(search, segment.search) + && Objects.equals(committed, segment.committed) + && Objects.equals(attributes, segment.attributes) + && Objects.equals(version, segment.version) + && Objects.equals(compound, segment.compound) + && Objects.equals(mergeId, segment.mergeId) + && Objects.equals(generation, segment.generation); } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2a071248ef55..abe0471d5251c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -305,6 +305,8 @@ Runnable getGlobalCheckpointSyncer() { private final CheckpointRefreshListener checkpointRefreshListener; + private volatile ReplicationCheckpoint latestReceivedCheckpoint; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -1439,16 +1441,18 @@ public SegmentInfos getLatestSegmentInfos() { } public ReplicationCheckpoint getLatestReplicationCheckpoint() { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); return new ReplicationCheckpoint( this.shardId, getOperationPrimaryTerm(), - getLatestSegmentInfos().getGeneration(), - getProcessedLocalCheckpoint() + latestSegmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + latestSegmentInfos.getVersion() ); } - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { - getEngine().updateCurrentInfos(infos, seqNo); + public void finalizeReplication(SegmentInfos infos, MetadataSnapshot expectedMetadata, long seqNo) throws IOException { + getEngine().finalizeReplication(infos, expectedMetadata, seqNo); } /** @@ -3652,32 +3656,36 @@ public synchronized void onNewCheckpoint( final PrimaryShardReplicationSource source, final SegmentReplicationReplicaService segmentReplicationReplicaService ) { - logger.debug("Checkpoint received {}", request.getCheckpoint()); - ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); - logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint()); - if (localCheckpoint.equals(request.getCheckpoint())) { - logger.debug("Ignore - Shard is already on checkpoint"); - return; - } - if (state.equals(IndexShardState.STARTED) == false) { - logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); - return; - } - if (isReplicating()) { - logger.debug("Ignore - shard is currently replicating to a checkpoint"); - return; + final ReplicationCheckpoint requestCheckpoint = request.getCheckpoint(); + logger.debug("Checkpoint received {}", requestCheckpoint); + + if (requestCheckpoint.isAheadOf(latestReceivedCheckpoint)) { + latestReceivedCheckpoint = requestCheckpoint; } + + if (shouldProcessCheckpoint(requestCheckpoint) == false) return; try { - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - logger.trace("Received new checkpoint {}", checkpoint); + logger.debug("Processing new checkpoint {}", requestCheckpoint); segmentReplicationReplicaService.startReplication( - checkpoint, + requestCheckpoint, this, source, new SegmentReplicationReplicaService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { logger.debug("Replication complete to {}", getLatestReplicationCheckpoint()); + // if we received a checkpoint during the copy event that is ahead of this + // try and process it. + if (latestReceivedCheckpoint.isAheadOf(getLatestReplicationCheckpoint())) { + threadPool.generic() + .execute( + () -> onNewCheckpoint( + new PublishCheckpointRequest(latestReceivedCheckpoint), + source, + segmentReplicationReplicaService + ) + ); + } } @Override @@ -3695,6 +3703,28 @@ public void onReplicationFailure( } } + private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint()); + if (state.equals(IndexShardState.STARTED) == false) { + logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); + return false; + } + if (isReplicating()) { + logger.debug("Ignore - shard is currently replicating to a checkpoint"); + return false; + } + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint); + return false; + } + return true; + } + public SegmentReplicationState getReplicationState() { return this.segRepState; } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index e89abab2d1c49..55ae60051b0b1 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -691,12 +691,16 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @param localSnapshot The local snapshot from in memory SegmentInfos. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { + public void cleanupAndVerify( + String reason, + MetadataSnapshot remoteSnapshot, + MetadataSnapshot localSnapshot, + MetadataSnapshot latestCommitPointMetadata + ) throws IOException { // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); verifyAfterCleanup(remoteSnapshot, localSnapshot); } finally { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java index 1fa599c4bda49..5d8e0700ecce3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java @@ -43,7 +43,8 @@ public class CopyState extends AbstractRefCounted { shardId, shard.getOperationPrimaryTerm(), segmentInfos.getGeneration(), - shard.getProcessedLocalCheckpoint() + shard.getProcessedLocalCheckpoint(), + segmentInfos.getVersion() ); // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java index eb990078f35b9..38b4099a7755e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.copy; +import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -22,12 +23,14 @@ public class ReplicationCheckpoint implements Writeable { private final long primaryTerm; private final long segmentsGen; private final long seqNo; + private final long version; - public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segments_gen, long seqNo) { + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long version) { this.shardId = shardId; this.primaryTerm = primaryTerm; - this.segmentsGen = segments_gen; + this.segmentsGen = segmentsGen; this.seqNo = seqNo; + this.version = version; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -35,6 +38,7 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { primaryTerm = in.readLong(); segmentsGen = in.readLong(); seqNo = in.readLong(); + version = in.readLong(); } public long getPrimaryTerm() { @@ -45,6 +49,10 @@ public long getSegmentsGen() { return segmentsGen; } + public long getVersion() { + return version; + } + public long getSeqNo() { return seqNo; } @@ -59,6 +67,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(primaryTerm); out.writeLong(segmentsGen); out.writeLong(seqNo); + out.writeLong(version); } @Override @@ -69,6 +78,7 @@ public boolean equals(Object o) { return primaryTerm == that.primaryTerm && segmentsGen == that.segmentsGen && seqNo == that.seqNo + && version == that.version && Objects.equals(shardId, that.shardId); } @@ -77,6 +87,10 @@ public int hashCode() { return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo); } + public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { + return other == null || version > other.getVersion(); + } + @Override public String toString() { return "ReplicationCheckpoint{" @@ -88,6 +102,8 @@ public String toString() { + segmentsGen + ", seqNo=" + seqNo + + ", version=" + + version + '}'; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index 2c8d1e19c8c92..e8fe555bff334 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -177,14 +177,6 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR final Store store = store(); store.incRef(); try { - if (indexShard.getRetentionLeases().leases().isEmpty()) { - // if empty, may be a fresh IndexShard, so write an empty leases file to disk - indexShard.persistRetentionLeases(); - assert indexShard.loadRetentionLeases().leases().isEmpty(); - } else { - assert indexShard.assertRetentionLeasesPersisted(); - } - // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); SegmentInfos infos = SegmentInfos.readCommit( @@ -192,16 +184,8 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); - // clean up the local store of old segment files - // and validate the latest segment infos against the snapshot sent from the primary shard. - store.cleanupAndVerify( - "finalize - clean with in memory infos", - checkpointInfoResponse.getSnapshot(), - store.getMetadata(infos) - ); - // Update the current infos reference on the Engine's reader. - indexShard.updateCurrentInfos(infos, responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos, checkpointInfoResponse.getSnapshot(), responseCheckpoint.getSeqNo()); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are