diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index f7d0de08ed65c..b0a26b88f38e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -8,14 +8,28 @@ package org.opensearch.indices.replication; +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.Assert; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexModule; +import org.opensearch.index.engine.Segment; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -44,24 +58,39 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); } - refresh(INDEX_NAME); - // wait a short amount of time to give replication a chance to complete. - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - - final int additionalDocCount = scaledRandomIntBetween(0, 200); - final int totalDocs = initialDocCount + additionalDocCount; - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { - waitForDocs(additionalDocCount, indexer); - } - flush(INDEX_NAME); - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } public void testReplicationAfterForceMerge() throws Exception { @@ -71,29 +100,48 @@ public void testReplicationAfterForceMerge() throws Exception { ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + // This case tests that replicas preserve these files so the local store is not corrupt. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + Thread.sleep(1000); + refresh(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); } - flush(INDEX_NAME); - // wait a short amount of time to give replication a chance to complete. - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - - final int additionalDocCount = scaledRandomIntBetween(0, 200); - final int totalDocs = initialDocCount + additionalDocCount; - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { - waitForDocs(additionalDocCount, indexer); - } - // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. - // This case tests that replicas preserve these files so the local store is not corrupt. - client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } - public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { + public void testReplicaSetupAfterPrimaryIndexesDocs() { final String nodeA = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -122,5 +170,104 @@ public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { ensureGreen(INDEX_NAME); assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertSegmentStats(REPLICA_COUNT); + } + + private void assertSegmentStats(int numberOfReplicas) { + client().admin().indices().segments(new IndicesSegmentsRequest(), new ActionListener<>() { + @Override + public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { + + List segmentsByIndex = indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = Arrays.stream(replicationGroupSegments) + .collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + + // create a map of the primary's segments keyed by segment name, allowing us to compare the same segment found on + // replicas. + final Map primarySegmentsMap = primaryShardSegments.getSegments() + .stream() + .collect(Collectors.toMap(Segment::getName, Function.identity())); + // For every replica, ensure that its segments are in the same state as on the primary. + // It is possible the primary has not cleaned up old segments that are not required on replicas, so we can't do a + // list comparison. + // This equality check includes search/committed properties on the Segment. Combined with docCount checks, + // this ensures the replica has correctly copied the latest segments and has all segments referenced by the latest + // commit point, even if they are not searchable. + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + for (Segment replicaSegment : shardSegment.getSegments()) { + final Segment primarySegment = primarySegmentsMap.get(replicaSegment.getName()); + assertEquals("Replica's segment should be identical to primary's version", replicaSegment, primarySegment); + } + } + } + } + + @Override + public void onFailure(Exception e) { + Assert.fail("Error fetching segment stats"); + } + }); + } + + public void testDeleteOperations()throws Exception{ + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build() + ); + ensureGreen(INDEX_NAME); + client().prepareIndex(INDEX_NAME) + .setId("1") + .setSource("foo", "bar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareIndex(INDEX_NAME) + .setId("2") + .setSource("fooo", "baar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + + // Now delete with blockUntilRefresh + client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 86b3b835c88e5..ee1b04b7480da 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {} + public void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) throws IOException {} public long getProcessedLocalCheckpoint() { return 0L; @@ -369,6 +369,8 @@ public Condition newCondition() { */ public abstract DeleteResult delete(Delete delete) throws IOException; + public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException; + public abstract NoOpResult noOp(NoOp noOp) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fa64e78e40c45..1ccd52f9652d8 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -36,21 +36,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.LiveIndexWriterConfig; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.ShuffleForcedMergePolicy; -import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; -import org.apache.lucene.index.StandardDirectoryReader; -import org.apache.lucene.index.Term; +import org.apache.lucene.index.*; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; @@ -104,6 +90,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; @@ -336,11 +323,28 @@ public InternalEngine(EngineConfig engineConfig) { } @Override - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + public synchronized void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) + throws IOException { assert engineConfig.isReadOnly() : "Only replicas should update Infos"; + + store.incRef(); + try { + refreshLastCommittedSegmentInfos(); + // clean up the local store of old segment files + // and validate the latest segment infos against the snapshot sent from the primary shard. + store.cleanupAndVerify( + "finalize - clean with in memory infos", + expectedMetadata, + store.getMetadata(infos), + store.getMetadata(lastCommittedSegmentInfos) + ); + } finally { + store.decRef(); + } + // Update the current infos reference on the Engine's reader. externalReaderManager.internalReaderManager.updateSegments(infos); - externalReaderManager.maybeRefresh(); localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + externalReaderManager.maybeRefresh(); } private LocalCheckpointTracker createLocalCheckpointTracker( @@ -698,11 +702,12 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } private DirectoryReader getDirectoryReader() throws IOException { - // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. + // for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas. + // We should always wrap replicas with a SoftDeletesDirectoryReaderWrapper as we use soft deletes when segment replication is on for deletions if (engineConfig.isReadOnly()) { - return DirectoryReader.open(store.directory()); + return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } - return DirectoryReader.open(indexWriter); + return DirectoryReader.open(indexWriter, true, true); } @Override @@ -1506,8 +1511,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); - deleteResult.setTranslogLocation(location); + addDeleteOperationToTranslog(delete, deleteResult); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); if (deleteResult.getTranslogLocation() == null) { @@ -1531,6 +1535,30 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } + @Override + public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) { + DeletionStrategy plan = deletionStrategyForOperation(delete); + DeleteResult deleteResult = new DeleteResult( + plan.versionOfDeletion, + delete.primaryTerm(), + delete.seqNo(), + plan.currentlyDeleted == false + ); + addDeleteOperationToTranslog(delete, deleteResult); + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); + return deleteResult; + } + } + + private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException{ + if(deleteResult.getResultType() == Result.Type.SUCCESS){ + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); + } + } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { assert operation.origin() == Operation.Origin.PRIMARY : operation; assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; @@ -2268,6 +2296,10 @@ public SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = externalReaderManager.internalReaderManager.acquire(); + // This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment replication is enabled + if (engineConfig.isReadOnly()) { + return ((StandardDirectoryReader)((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate()).getSegmentInfos(); + } return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index a0f9f9c5d7a5d..91404ecf62de8 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -34,14 +34,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.index.*; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import java.io.IOException; @@ -98,7 +95,8 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); - reader = OpenSearchDirectoryReader.wrap(innerReader, referenceToRefresh.shardId()); + final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(innerReader, Lucene.SOFT_DELETES_FIELD); + reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader); } return reader; diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index cd04cb9a7a54c..b2aebb41acb69 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } + @Override + public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/engine/Segment.java b/server/src/main/java/org/opensearch/index/engine/Segment.java index 4874d0a30196f..a63f39af3462a 100644 --- a/server/src/main/java/org/opensearch/index/engine/Segment.java +++ b/server/src/main/java/org/opensearch/index/engine/Segment.java @@ -178,8 +178,17 @@ public boolean equals(Object o) { Segment segment = (Segment) o; - return Objects.equals(name, segment.name); - + return Objects.equals(name, segment.name) + && Objects.equals(docCount, segment.docCount) + && Objects.equals(delDocCount, segment.delDocCount) + && Objects.equals(sizeInBytes, segment.sizeInBytes) + && Objects.equals(search, segment.search) + && Objects.equals(committed, segment.committed) + && Objects.equals(attributes, segment.attributes) + && Objects.equals(version, segment.version) + && Objects.equals(compound, segment.compound) + && Objects.equals(mergeId, segment.mergeId) + && Objects.equals(generation, segment.generation); } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2a071248ef55..174d9f6621ae2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -305,6 +305,8 @@ Runnable getGlobalCheckpointSyncer() { private final CheckpointRefreshListener checkpointRefreshListener; + private volatile ReplicationCheckpoint latestReceivedCheckpoint; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -1139,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation( + "]"; ensureWriteAllowed(origin); final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); + if(origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()){ + return getEngine().addDeleteOperationToTranslog(delete); + } return delete(engine, delete); } @@ -1439,16 +1444,18 @@ public SegmentInfos getLatestSegmentInfos() { } public ReplicationCheckpoint getLatestReplicationCheckpoint() { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); return new ReplicationCheckpoint( this.shardId, getOperationPrimaryTerm(), - getLatestSegmentInfos().getGeneration(), - getProcessedLocalCheckpoint() + latestSegmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + latestSegmentInfos.getVersion() ); } - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { - getEngine().updateCurrentInfos(infos, seqNo); + public void finalizeReplication(SegmentInfos infos, MetadataSnapshot expectedMetadata, long seqNo) throws IOException { + getEngine().finalizeReplication(infos, expectedMetadata, seqNo); } /** @@ -3652,32 +3659,36 @@ public synchronized void onNewCheckpoint( final PrimaryShardReplicationSource source, final SegmentReplicationReplicaService segmentReplicationReplicaService ) { - logger.debug("Checkpoint received {}", request.getCheckpoint()); - ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); - logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint()); - if (localCheckpoint.equals(request.getCheckpoint())) { - logger.debug("Ignore - Shard is already on checkpoint"); - return; - } - if (state.equals(IndexShardState.STARTED) == false) { - logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); - return; - } - if (isReplicating()) { - logger.debug("Ignore - shard is currently replicating to a checkpoint"); - return; + final ReplicationCheckpoint requestCheckpoint = request.getCheckpoint(); + logger.debug("Checkpoint received {}", requestCheckpoint); + + if (requestCheckpoint.isAheadOf(latestReceivedCheckpoint)) { + latestReceivedCheckpoint = requestCheckpoint; } + + if (shouldProcessCheckpoint(requestCheckpoint) == false) return; try { - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - logger.trace("Received new checkpoint {}", checkpoint); + logger.debug("Processing new checkpoint {}", requestCheckpoint); segmentReplicationReplicaService.startReplication( - checkpoint, + requestCheckpoint, this, source, new SegmentReplicationReplicaService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { logger.debug("Replication complete to {}", getLatestReplicationCheckpoint()); + // if we received a checkpoint during the copy event that is ahead of this + // try and process it. + if (latestReceivedCheckpoint.isAheadOf(getLatestReplicationCheckpoint())) { + threadPool.generic() + .execute( + () -> onNewCheckpoint( + new PublishCheckpointRequest(latestReceivedCheckpoint), + source, + segmentReplicationReplicaService + ) + ); + } } @Override @@ -3695,6 +3706,28 @@ public void onReplicationFailure( } } + private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint()); + if (state.equals(IndexShardState.STARTED) == false) { + logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); + return false; + } + if (isReplicating()) { + logger.debug("Ignore - shard is currently replicating to a checkpoint"); + return false; + } + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint); + return false; + } + return true; + } + public SegmentReplicationState getReplicationState() { return this.segRepState; } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index e89abab2d1c49..55ae60051b0b1 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -691,12 +691,16 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @param localSnapshot The local snapshot from in memory SegmentInfos. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { + public void cleanupAndVerify( + String reason, + MetadataSnapshot remoteSnapshot, + MetadataSnapshot localSnapshot, + MetadataSnapshot latestCommitPointMetadata + ) throws IOException { // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); verifyAfterCleanup(remoteSnapshot, localSnapshot); } finally { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java index 1fa599c4bda49..5d8e0700ecce3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java @@ -43,7 +43,8 @@ public class CopyState extends AbstractRefCounted { shardId, shard.getOperationPrimaryTerm(), segmentInfos.getGeneration(), - shard.getProcessedLocalCheckpoint() + shard.getProcessedLocalCheckpoint(), + segmentInfos.getVersion() ); // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java index eb990078f35b9..38b4099a7755e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.copy; +import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -22,12 +23,14 @@ public class ReplicationCheckpoint implements Writeable { private final long primaryTerm; private final long segmentsGen; private final long seqNo; + private final long version; - public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segments_gen, long seqNo) { + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long version) { this.shardId = shardId; this.primaryTerm = primaryTerm; - this.segmentsGen = segments_gen; + this.segmentsGen = segmentsGen; this.seqNo = seqNo; + this.version = version; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -35,6 +38,7 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { primaryTerm = in.readLong(); segmentsGen = in.readLong(); seqNo = in.readLong(); + version = in.readLong(); } public long getPrimaryTerm() { @@ -45,6 +49,10 @@ public long getSegmentsGen() { return segmentsGen; } + public long getVersion() { + return version; + } + public long getSeqNo() { return seqNo; } @@ -59,6 +67,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(primaryTerm); out.writeLong(segmentsGen); out.writeLong(seqNo); + out.writeLong(version); } @Override @@ -69,6 +78,7 @@ public boolean equals(Object o) { return primaryTerm == that.primaryTerm && segmentsGen == that.segmentsGen && seqNo == that.seqNo + && version == that.version && Objects.equals(shardId, that.shardId); } @@ -77,6 +87,10 @@ public int hashCode() { return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo); } + public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { + return other == null || version > other.getVersion(); + } + @Override public String toString() { return "ReplicationCheckpoint{" @@ -88,6 +102,8 @@ public String toString() { + segmentsGen + ", seqNo=" + seqNo + + ", version=" + + version + '}'; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index 2c8d1e19c8c92..e8fe555bff334 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -177,14 +177,6 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR final Store store = store(); store.incRef(); try { - if (indexShard.getRetentionLeases().leases().isEmpty()) { - // if empty, may be a fresh IndexShard, so write an empty leases file to disk - indexShard.persistRetentionLeases(); - assert indexShard.loadRetentionLeases().leases().isEmpty(); - } else { - assert indexShard.assertRetentionLeasesPersisted(); - } - // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); SegmentInfos infos = SegmentInfos.readCommit( @@ -192,16 +184,8 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); - // clean up the local store of old segment files - // and validate the latest segment infos against the snapshot sent from the primary shard. - store.cleanupAndVerify( - "finalize - clean with in memory infos", - checkpointInfoResponse.getSnapshot(), - store.getMetadata(infos) - ); - // Update the current infos reference on the Engine's reader. - indexShard.updateCurrentInfos(infos, responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos, checkpointInfoResponse.getSnapshot(), responseCheckpoint.getSeqNo()); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are