From c480ab5985206cdba12fe3c19e37ea61fd2d11e3 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 5 Apr 2022 15:22:19 +0000 Subject: [PATCH 1/7] Draft PR for delete Operations, failing Integ tests Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 43 +++++++++++++++++++ .../org/opensearch/index/engine/Engine.java | 2 + .../index/engine/InternalEngine.java | 27 +++++++++++- .../index/engine/OpenSearchReaderManager.java | 4 ++ .../index/engine/ReadOnlyEngine.java | 6 +++ .../opensearch/index/shard/IndexShard.java | 3 ++ 6 files changed, 83 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index f7d0de08ed65c..4d34c05fd0e29 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -8,6 +8,9 @@ package org.opensearch.indices.replication; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -123,4 +126,44 @@ public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); } + + public void testDelOps()throws Exception{ + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build() + ); + ensureGreen(INDEX_NAME); + IndexResponse index = client().prepareIndex(INDEX_NAME) + .setId("1") + .setSource("foo", "bar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); +// indexRandom(true, client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar")); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + IndexResponse index2 = client().prepareIndex(INDEX_NAME) + .setId("2") + .setSource("fooo", "baar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + +// indexRandom(true, client().prepareIndex(INDEX_NAME).setId("2").setSource("fooo", "baar")); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + + // Now delete with blockUntilRefresh + DeleteResponse delete = client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertEquals(DocWriteResponse.Result.DELETED, delete.getResult()); + assertFalse("request shouldn't have forced a refresh", delete.forcedRefresh()); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 86b3b835c88e5..297f261d2b879 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -369,6 +369,8 @@ public Condition newCondition() { */ public abstract DeleteResult delete(Delete delete) throws IOException; + public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException; + public abstract NoOpResult noOp(NoOp noOp) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fa64e78e40c45..44a3f5114c0c6 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1506,8 +1506,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); - deleteResult.setTranslogLocation(location); + addDeleteOperationToTranslog(delete, deleteResult); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); if (deleteResult.getTranslogLocation() == null) { @@ -1531,6 +1530,30 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } + @Override + public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) { + DeletionStrategy plan = deletionStrategyForOperation(delete); + DeleteResult deleteResult = new DeleteResult( + plan.versionOfDeletion, + delete.primaryTerm(), + delete.seqNo(), + plan.currentlyDeleted == false + ); + addDeleteOperationToTranslog(delete, deleteResult); + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); + return deleteResult; + } + } + + private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException{ + if(deleteResult.getResultType() == Result.Type.SUCCESS){ + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); + } + } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { assert operation.origin() == Operation.Origin.PRIMARY : operation; assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index a0f9f9c5d7a5d..d85c647ed47f4 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -95,11 +95,15 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re // If not using NRT repl. if (currentInfos == null) { reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); + if (reader != null) { + logger.info("Num docs primary {}", reader.getDelegate().numDocs()); + } } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); reader = OpenSearchDirectoryReader.wrap(innerReader, referenceToRefresh.shardId()); logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader); + logger.info("Num docs replica {}", reader.getDelegate().numDocs()); } return reader; } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index cd04cb9a7a54c..b2aebb41acb69 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } + @Override + public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2a071248ef55..159c25b1c5ae0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1139,6 +1139,9 @@ private Engine.DeleteResult applyDeleteOperation( + "]"; ensureWriteAllowed(origin); final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); + if(origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()){ + return getEngine().addDeleteOperationToTranslog(delete); + } return delete(engine, delete); } From cbc8d2d388881b36c55936c17bc48211b9da6d96 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 5 Apr 2022 21:40:51 +0000 Subject: [PATCH 2/7] adding soft delete changes Signed-off-by: Rishikesh1159 --- .../index/engine/InternalEngine.java | 19 ++++--------------- .../index/engine/OpenSearchReaderManager.java | 15 ++++++++------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 44a3f5114c0c6..cb1e57c61cdbe 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -36,21 +36,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.LiveIndexWriterConfig; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.ShuffleForcedMergePolicy; -import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; -import org.apache.lucene.index.StandardDirectoryReader; -import org.apache.lucene.index.Term; +import org.apache.lucene.index.*; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; @@ -2291,6 +2277,9 @@ public SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = externalReaderManager.internalReaderManager.acquire(); + if (engineConfig.isReadOnly()) { + return ((StandardDirectoryReader)((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate()).getSegmentInfos(); + } return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index d85c647ed47f4..8e6ac4ec66486 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -34,14 +34,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.index.*; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import java.io.IOException; @@ -101,9 +98,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); - reader = OpenSearchDirectoryReader.wrap(innerReader, referenceToRefresh.shardId()); + final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(innerReader, Lucene.SOFT_DELETES_FIELD); + logger.info("Doc count {}", softDeletesDirectoryReaderWrapper.numDocs()); + logger.info("Deleted doc count {}", softDeletesDirectoryReaderWrapper.numDeletedDocs()); + reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); + logger.info("reader Doc count {}", reader.numDocs()); + logger.info("reader Deleted doc count {}", reader.numDeletedDocs()); logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader); - logger.info("Num docs replica {}", reader.getDelegate().numDocs()); } return reader; } From a11748864462760a07912b7b7ff684073aa90079 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 6 Apr 2022 22:56:38 +0000 Subject: [PATCH 3/7] adding soft delete wrapper in Directory Reader Signed-off-by: Rishikesh1159 --- .../main/java/org/opensearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index cb1e57c61cdbe..073110c0be81a 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -686,7 +686,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. if (engineConfig.isReadOnly()) { - return DirectoryReader.open(store.directory()); + return (SoftDeletesDirectoryReaderWrapper)DirectoryReader.open(store.directory()); } return DirectoryReader.open(indexWriter); } From 128440ba18a8a2a6efacbb1b9d1f74a6d5d8fcd9 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 6 Apr 2022 23:13:01 +0000 Subject: [PATCH 4/7] Revert "Draft PR for delete Operations, failing Integ tests" This reverts commit 24e3769652f3438899681389203bc22d0c426aac. Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 43 ------------------- .../org/opensearch/index/engine/Engine.java | 2 - .../index/engine/InternalEngine.java | 27 +----------- .../index/engine/OpenSearchReaderManager.java | 3 -- .../index/engine/ReadOnlyEngine.java | 6 --- .../opensearch/index/shard/IndexShard.java | 3 -- 6 files changed, 2 insertions(+), 82 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 4d34c05fd0e29..f7d0de08ed65c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -8,9 +8,6 @@ package org.opensearch.indices.replication; -import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -126,44 +123,4 @@ public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); } - - public void testDelOps()throws Exception{ - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); - - createIndex( - INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) - .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) - .build() - ); - ensureGreen(INDEX_NAME); - IndexResponse index = client().prepareIndex(INDEX_NAME) - .setId("1") - .setSource("foo", "bar") - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .get(); -// indexRandom(true, client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar")); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - - IndexResponse index2 = client().prepareIndex(INDEX_NAME) - .setId("2") - .setSource("fooo", "baar") - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .get(); - -// indexRandom(true, client().prepareIndex(INDEX_NAME).setId("2").setSource("fooo", "baar")); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); - - // Now delete with blockUntilRefresh - DeleteResponse delete = client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); - assertEquals(DocWriteResponse.Result.DELETED, delete.getResult()); - assertFalse("request shouldn't have forced a refresh", delete.forcedRefresh()); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 297f261d2b879..86b3b835c88e5 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -369,8 +369,6 @@ public Condition newCondition() { */ public abstract DeleteResult delete(Delete delete) throws IOException; - public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException; - public abstract NoOpResult noOp(NoOp noOp) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 073110c0be81a..2c161e511edae 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1492,7 +1492,8 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - addDeleteOperationToTranslog(delete, deleteResult); + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); if (deleteResult.getTranslogLocation() == null) { @@ -1516,30 +1517,6 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } - @Override - public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ - try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) { - DeletionStrategy plan = deletionStrategyForOperation(delete); - DeleteResult deleteResult = new DeleteResult( - plan.versionOfDeletion, - delete.primaryTerm(), - delete.seqNo(), - plan.currentlyDeleted == false - ); - addDeleteOperationToTranslog(delete, deleteResult); - deleteResult.setTook(System.nanoTime() - delete.startTime()); - deleteResult.freeze(); - return deleteResult; - } - } - - private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException{ - if(deleteResult.getResultType() == Result.Type.SUCCESS){ - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); - deleteResult.setTranslogLocation(location); - } - } - private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { assert operation.origin() == Operation.Origin.PRIMARY : operation; assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index 8e6ac4ec66486..dc9ed0219eec1 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -92,9 +92,6 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re // If not using NRT repl. if (currentInfos == null) { reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); - if (reader != null) { - logger.info("Num docs primary {}", reader.getDelegate().numDocs()); - } } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index b2aebb41acb69..cd04cb9a7a54c 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -306,12 +306,6 @@ public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } - @Override - public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ - assert false : "this should not be called"; - throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); - } - @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 159c25b1c5ae0..f2a071248ef55 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1139,9 +1139,6 @@ private Engine.DeleteResult applyDeleteOperation( + "]"; ensureWriteAllowed(origin); final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); - if(origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()){ - return getEngine().addDeleteOperationToTranslog(delete); - } return delete(engine, delete); } From 225417619ae7d179c4ae7002060bd79be2af93cd Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 5 Apr 2022 16:59:48 -0700 Subject: [PATCH 5/7] Fix Segment Replication integ tests (#2637) * Fix Segment replication integ tests by using a single BackgroundIndexer. BackgroundIndexers begin indexing docs with a docId of 0 up to the requested numDocs. Using two overlapped the docIds so counts were incorrect. This changes our tests to use a single indexer, it also improves assertions on segment data on both shards. Signed-off-by: Marc Handalian * Fix Index shards to correctly compare checkpoint version instead of segment gen when determining if a received checkpoint should be processed. Without this change replicas will ignore checkpoints for merges. Signed-off-by: Marc Handalian * Updated based on PR feedback. Rename latestUnprocessedCheckpoint to latestReceivedCheckpoint. Remove redundant checkpoint variable in onNewCheckpoint. Rename isValidCheckpoint to shouldProcessCheckpoint. Add missing isAheadOf check to shouldProcessCheckpoint. Update SegmentReplicationIT's assertSegmentStats to be more readable. Removed latest seqNo hack. Signed-off-by: Marc Handalian Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 176 ++++++++++++++---- .../org/opensearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 22 ++- .../org/opensearch/index/engine/Segment.java | 13 +- .../opensearch/index/shard/IndexShard.java | 72 ++++--- .../org/opensearch/index/store/Store.java | 8 +- .../indices/replication/copy/CopyState.java | 3 +- .../copy/ReplicationCheckpoint.java | 20 +- .../copy/SegmentReplicationTarget.java | 18 +- 9 files changed, 251 insertions(+), 83 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index f7d0de08ed65c..cf80120bc44a3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -8,14 +8,28 @@ package org.opensearch.indices.replication; +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.Assert; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexModule; +import org.opensearch.index.engine.Segment; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -44,24 +58,39 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); } - refresh(INDEX_NAME); - // wait a short amount of time to give replication a chance to complete. - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - - final int additionalDocCount = scaledRandomIntBetween(0, 200); - final int totalDocs = initialDocCount + additionalDocCount; - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { - waitForDocs(additionalDocCount, indexer); - } - flush(INDEX_NAME); - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } public void testReplicationAfterForceMerge() throws Exception { @@ -71,29 +100,48 @@ public void testReplicationAfterForceMerge() throws Exception { ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + // This case tests that replicas preserve these files so the local store is not corrupt. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + Thread.sleep(1000); + refresh(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); } - flush(INDEX_NAME); - // wait a short amount of time to give replication a chance to complete. - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - - final int additionalDocCount = scaledRandomIntBetween(0, 200); - final int totalDocs = initialDocCount + additionalDocCount; - try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { - waitForDocs(additionalDocCount, indexer); - } - // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. - // This case tests that replicas preserve these files so the local store is not corrupt. - client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); - Thread.sleep(1000); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } - public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { + public void testReplicaSetupAfterPrimaryIndexesDocs() { final String nodeA = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -122,5 +170,63 @@ public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { ensureGreen(INDEX_NAME); assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertSegmentStats(REPLICA_COUNT); + } + + private void assertSegmentStats(int numberOfReplicas) { + client().admin().indices().segments(new IndicesSegmentsRequest(), new ActionListener<>() { + @Override + public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { + + List segmentsByIndex = indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = Arrays.stream(replicationGroupSegments) + .collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + + // create a map of the primary's segments keyed by segment name, allowing us to compare the same segment found on + // replicas. + final Map primarySegmentsMap = primaryShardSegments.getSegments() + .stream() + .collect(Collectors.toMap(Segment::getName, Function.identity())); + // For every replica, ensure that its segments are in the same state as on the primary. + // It is possible the primary has not cleaned up old segments that are not required on replicas, so we can't do a + // list comparison. + // This equality check includes search/committed properties on the Segment. Combined with docCount checks, + // this ensures the replica has correctly copied the latest segments and has all segments referenced by the latest + // commit point, even if they are not searchable. + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + for (Segment replicaSegment : shardSegment.getSegments()) { + final Segment primarySegment = primarySegmentsMap.get(replicaSegment.getName()); + assertEquals("Replica's segment should be identical to primary's version", replicaSegment, primarySegment); + } + } + } + } + + @Override + public void onFailure(Exception e) { + Assert.fail("Error fetching segment stats"); + } + }); } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 86b3b835c88e5..b3af769dc0ccf 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {} + public void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) throws IOException {} public long getProcessedLocalCheckpoint() { return 0L; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 2c161e511edae..16fa8a80ec3e0 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -90,6 +90,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; @@ -322,11 +323,28 @@ public InternalEngine(EngineConfig engineConfig) { } @Override - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + public synchronized void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) + throws IOException { assert engineConfig.isReadOnly() : "Only replicas should update Infos"; + + store.incRef(); + try { + refreshLastCommittedSegmentInfos(); + // clean up the local store of old segment files + // and validate the latest segment infos against the snapshot sent from the primary shard. + store.cleanupAndVerify( + "finalize - clean with in memory infos", + expectedMetadata, + store.getMetadata(infos), + store.getMetadata(lastCommittedSegmentInfos) + ); + } finally { + store.decRef(); + } + // Update the current infos reference on the Engine's reader. externalReaderManager.internalReaderManager.updateSegments(infos); - externalReaderManager.maybeRefresh(); localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + externalReaderManager.maybeRefresh(); } private LocalCheckpointTracker createLocalCheckpointTracker( diff --git a/server/src/main/java/org/opensearch/index/engine/Segment.java b/server/src/main/java/org/opensearch/index/engine/Segment.java index 4874d0a30196f..a63f39af3462a 100644 --- a/server/src/main/java/org/opensearch/index/engine/Segment.java +++ b/server/src/main/java/org/opensearch/index/engine/Segment.java @@ -178,8 +178,17 @@ public boolean equals(Object o) { Segment segment = (Segment) o; - return Objects.equals(name, segment.name); - + return Objects.equals(name, segment.name) + && Objects.equals(docCount, segment.docCount) + && Objects.equals(delDocCount, segment.delDocCount) + && Objects.equals(sizeInBytes, segment.sizeInBytes) + && Objects.equals(search, segment.search) + && Objects.equals(committed, segment.committed) + && Objects.equals(attributes, segment.attributes) + && Objects.equals(version, segment.version) + && Objects.equals(compound, segment.compound) + && Objects.equals(mergeId, segment.mergeId) + && Objects.equals(generation, segment.generation); } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2a071248ef55..abe0471d5251c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -305,6 +305,8 @@ Runnable getGlobalCheckpointSyncer() { private final CheckpointRefreshListener checkpointRefreshListener; + private volatile ReplicationCheckpoint latestReceivedCheckpoint; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -1439,16 +1441,18 @@ public SegmentInfos getLatestSegmentInfos() { } public ReplicationCheckpoint getLatestReplicationCheckpoint() { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); return new ReplicationCheckpoint( this.shardId, getOperationPrimaryTerm(), - getLatestSegmentInfos().getGeneration(), - getProcessedLocalCheckpoint() + latestSegmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + latestSegmentInfos.getVersion() ); } - public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { - getEngine().updateCurrentInfos(infos, seqNo); + public void finalizeReplication(SegmentInfos infos, MetadataSnapshot expectedMetadata, long seqNo) throws IOException { + getEngine().finalizeReplication(infos, expectedMetadata, seqNo); } /** @@ -3652,32 +3656,36 @@ public synchronized void onNewCheckpoint( final PrimaryShardReplicationSource source, final SegmentReplicationReplicaService segmentReplicationReplicaService ) { - logger.debug("Checkpoint received {}", request.getCheckpoint()); - ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); - logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint()); - if (localCheckpoint.equals(request.getCheckpoint())) { - logger.debug("Ignore - Shard is already on checkpoint"); - return; - } - if (state.equals(IndexShardState.STARTED) == false) { - logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); - return; - } - if (isReplicating()) { - logger.debug("Ignore - shard is currently replicating to a checkpoint"); - return; + final ReplicationCheckpoint requestCheckpoint = request.getCheckpoint(); + logger.debug("Checkpoint received {}", requestCheckpoint); + + if (requestCheckpoint.isAheadOf(latestReceivedCheckpoint)) { + latestReceivedCheckpoint = requestCheckpoint; } + + if (shouldProcessCheckpoint(requestCheckpoint) == false) return; try { - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - logger.trace("Received new checkpoint {}", checkpoint); + logger.debug("Processing new checkpoint {}", requestCheckpoint); segmentReplicationReplicaService.startReplication( - checkpoint, + requestCheckpoint, this, source, new SegmentReplicationReplicaService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { logger.debug("Replication complete to {}", getLatestReplicationCheckpoint()); + // if we received a checkpoint during the copy event that is ahead of this + // try and process it. + if (latestReceivedCheckpoint.isAheadOf(getLatestReplicationCheckpoint())) { + threadPool.generic() + .execute( + () -> onNewCheckpoint( + new PublishCheckpointRequest(latestReceivedCheckpoint), + source, + segmentReplicationReplicaService + ) + ); + } } @Override @@ -3695,6 +3703,28 @@ public void onReplicationFailure( } } + private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint()); + if (state.equals(IndexShardState.STARTED) == false) { + logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state); + return false; + } + if (isReplicating()) { + logger.debug("Ignore - shard is currently replicating to a checkpoint"); + return false; + } + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint); + return false; + } + return true; + } + public SegmentReplicationState getReplicationState() { return this.segRepState; } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index e89abab2d1c49..55ae60051b0b1 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -691,12 +691,16 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @param localSnapshot The local snapshot from in memory SegmentInfos. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { + public void cleanupAndVerify( + String reason, + MetadataSnapshot remoteSnapshot, + MetadataSnapshot localSnapshot, + MetadataSnapshot latestCommitPointMetadata + ) throws IOException { // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); verifyAfterCleanup(remoteSnapshot, localSnapshot); } finally { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java index 1fa599c4bda49..5d8e0700ecce3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java @@ -43,7 +43,8 @@ public class CopyState extends AbstractRefCounted { shardId, shard.getOperationPrimaryTerm(), segmentInfos.getGeneration(), - shard.getProcessedLocalCheckpoint() + shard.getProcessedLocalCheckpoint(), + segmentInfos.getVersion() ); // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java index eb990078f35b9..38b4099a7755e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.copy; +import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -22,12 +23,14 @@ public class ReplicationCheckpoint implements Writeable { private final long primaryTerm; private final long segmentsGen; private final long seqNo; + private final long version; - public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segments_gen, long seqNo) { + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long version) { this.shardId = shardId; this.primaryTerm = primaryTerm; - this.segmentsGen = segments_gen; + this.segmentsGen = segmentsGen; this.seqNo = seqNo; + this.version = version; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -35,6 +38,7 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { primaryTerm = in.readLong(); segmentsGen = in.readLong(); seqNo = in.readLong(); + version = in.readLong(); } public long getPrimaryTerm() { @@ -45,6 +49,10 @@ public long getSegmentsGen() { return segmentsGen; } + public long getVersion() { + return version; + } + public long getSeqNo() { return seqNo; } @@ -59,6 +67,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(primaryTerm); out.writeLong(segmentsGen); out.writeLong(seqNo); + out.writeLong(version); } @Override @@ -69,6 +78,7 @@ public boolean equals(Object o) { return primaryTerm == that.primaryTerm && segmentsGen == that.segmentsGen && seqNo == that.seqNo + && version == that.version && Objects.equals(shardId, that.shardId); } @@ -77,6 +87,10 @@ public int hashCode() { return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo); } + public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { + return other == null || version > other.getVersion(); + } + @Override public String toString() { return "ReplicationCheckpoint{" @@ -88,6 +102,8 @@ public String toString() { + segmentsGen + ", seqNo=" + seqNo + + ", version=" + + version + '}'; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index 2c8d1e19c8c92..e8fe555bff334 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -177,14 +177,6 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR final Store store = store(); store.incRef(); try { - if (indexShard.getRetentionLeases().leases().isEmpty()) { - // if empty, may be a fresh IndexShard, so write an empty leases file to disk - indexShard.persistRetentionLeases(); - assert indexShard.loadRetentionLeases().leases().isEmpty(); - } else { - assert indexShard.assertRetentionLeasesPersisted(); - } - // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); SegmentInfos infos = SegmentInfos.readCommit( @@ -192,16 +184,8 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); - // clean up the local store of old segment files - // and validate the latest segment infos against the snapshot sent from the primary shard. - store.cleanupAndVerify( - "finalize - clean with in memory infos", - checkpointInfoResponse.getSnapshot(), - store.getMetadata(infos) - ); - // Update the current infos reference on the Engine's reader. - indexShard.updateCurrentInfos(infos, responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos, checkpointInfoResponse.getSnapshot(), responseCheckpoint.getSeqNo()); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are From df7a7776acce4994949818637c3702dcca5af36d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 6 Apr 2022 23:53:30 +0000 Subject: [PATCH 6/7] Delete Operations for Replicas when Segment Replication is toggled Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 41 +++++++++++++++++++ .../org/opensearch/index/engine/Engine.java | 2 + .../index/engine/InternalEngine.java | 29 +++++++++++-- .../index/engine/OpenSearchReaderManager.java | 3 ++ .../index/engine/ReadOnlyEngine.java | 6 +++ .../opensearch/index/shard/IndexShard.java | 3 ++ 6 files changed, 81 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index cf80120bc44a3..f613c67524cab 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -229,4 +229,45 @@ public void onFailure(Exception e) { } }); } + + public void testDelOps()throws Exception{ + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build() + ); + ensureGreen(INDEX_NAME); + client().prepareIndex(INDEX_NAME) + .setId("1") + .setSource("foo", "bar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareIndex(INDEX_NAME) + .setId("2") + .setSource("fooo", "baar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + + // Now delete with blockUntilRefresh + client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index b3af769dc0ccf..ee1b04b7480da 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -369,6 +369,8 @@ public Condition newCondition() { */ public abstract DeleteResult delete(Delete delete) throws IOException; + public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException; + public abstract NoOpResult noOp(NoOp noOp) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 16fa8a80ec3e0..fa32c81639e64 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -704,7 +704,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. if (engineConfig.isReadOnly()) { - return (SoftDeletesDirectoryReaderWrapper)DirectoryReader.open(store.directory()); + return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } return DirectoryReader.open(indexWriter); } @@ -1510,8 +1510,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); - deleteResult.setTranslogLocation(location); + addDeleteOperationToTranslog(delete, deleteResult); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); if (deleteResult.getTranslogLocation() == null) { @@ -1535,6 +1534,30 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } + @Override + public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) { + DeletionStrategy plan = deletionStrategyForOperation(delete); + DeleteResult deleteResult = new DeleteResult( + plan.versionOfDeletion, + delete.primaryTerm(), + delete.seqNo(), + plan.currentlyDeleted == false + ); + addDeleteOperationToTranslog(delete, deleteResult); + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); + return deleteResult; + } + } + + private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException{ + if(deleteResult.getResultType() == Result.Type.SUCCESS){ + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); + } + } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { assert operation.origin() == Operation.Origin.PRIMARY : operation; assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index dc9ed0219eec1..8e6ac4ec66486 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -92,6 +92,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re // If not using NRT repl. if (currentInfos == null) { reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); + if (reader != null) { + logger.info("Num docs primary {}", reader.getDelegate().numDocs()); + } } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index cd04cb9a7a54c..b2aebb41acb69 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } + @Override + public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index abe0471d5251c..174d9f6621ae2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1141,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation( + "]"; ensureWriteAllowed(origin); final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); + if(origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()){ + return getEngine().addDeleteOperationToTranslog(delete); + } return delete(engine, delete); } From bdc071dee1d2bb76d2b7902fdb36e709462e0307 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 11 Apr 2022 13:40:10 +0000 Subject: [PATCH 7/7] Adding comments and removing log statements Signed-off-by: Rishikesh1159 --- .../indices/replication/SegmentReplicationIT.java | 2 +- .../java/org/opensearch/index/engine/InternalEngine.java | 6 ++++-- .../opensearch/index/engine/OpenSearchReaderManager.java | 7 ------- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index f613c67524cab..b0a26b88f38e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -230,7 +230,7 @@ public void onFailure(Exception e) { }); } - public void testDelOps()throws Exception{ + public void testDeleteOperations()throws Exception{ final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fa32c81639e64..1ccd52f9652d8 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -702,11 +702,12 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } private DirectoryReader getDirectoryReader() throws IOException { - // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. + // for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas. + // We should always wrap replicas with a SoftDeletesDirectoryReaderWrapper as we use soft deletes when segment replication is on for deletions if (engineConfig.isReadOnly()) { return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } - return DirectoryReader.open(indexWriter); + return DirectoryReader.open(indexWriter, true, true); } @Override @@ -2295,6 +2296,7 @@ public SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = externalReaderManager.internalReaderManager.acquire(); + // This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment replication is enabled if (engineConfig.isReadOnly()) { return ((StandardDirectoryReader)((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate()).getSegmentInfos(); } diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index 8e6ac4ec66486..91404ecf62de8 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -92,18 +92,11 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re // If not using NRT repl. if (currentInfos == null) { reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); - if (reader != null) { - logger.info("Num docs primary {}", reader.getDelegate().numDocs()); - } } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(innerReader, Lucene.SOFT_DELETES_FIELD); - logger.info("Doc count {}", softDeletesDirectoryReaderWrapper.numDocs()); - logger.info("Deleted doc count {}", softDeletesDirectoryReaderWrapper.numDeletedDocs()); reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); - logger.info("reader Doc count {}", reader.numDocs()); - logger.info("reader Deleted doc count {}", reader.numDeletedDocs()); logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader); } return reader;