From 5ccdee8df912bd3bc8610bf108a3220723cf26d5 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 11 Apr 2022 15:06:45 +0000 Subject: [PATCH 1/7] Fix Delete Operations on replica when Segment Replication is enabled Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 37 ++++++++++++++++ .../org/opensearch/index/engine/Engine.java | 2 + .../index/engine/InternalEngine.java | 42 ++++++++++++++++--- .../index/engine/OpenSearchReaderManager.java | 9 +++- .../index/engine/ReadOnlyEngine.java | 6 +++ .../opensearch/index/shard/IndexShard.java | 3 ++ 6 files changed, 93 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index cf80120bc44a3..286fb1ebae74d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -229,4 +229,41 @@ public void onFailure(Exception e) { } }); } + + public void testDeleteOperations() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build() + ); + ensureGreen(INDEX_NAME); + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareIndex(INDEX_NAME) + .setId("2") + .setSource("fooo", "baar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + + // Now delete with blockUntilRefresh + client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index b3af769dc0ccf..ee1b04b7480da 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -369,6 +369,8 @@ public Condition newCondition() { */ public abstract DeleteResult delete(Delete delete) throws IOException; + public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException; + public abstract NoOpResult noOp(NoOp noOp) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 7859af0e22f64..4690771f5f181 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -50,6 +50,7 @@ import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; @@ -716,11 +717,13 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } private DirectoryReader getDirectoryReader() throws IOException { - // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. + // for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas. + // We should always wrap replicas with a SoftDeletesDirectoryReaderWrapper as we use soft deletes when segment replication is on for + // deletions if (engineConfig.isReadOnly()) { - return DirectoryReader.open(store.directory()); + return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } - return DirectoryReader.open(indexWriter); + return DirectoryReader.open(indexWriter, true, true); } @Override @@ -1524,8 +1527,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); - deleteResult.setTranslogLocation(location); + addDeleteOperationToTranslog(delete, deleteResult); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); if (deleteResult.getTranslogLocation() == null) { @@ -1549,6 +1551,30 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } + @Override + public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException { + try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) { + DeletionStrategy plan = deletionStrategyForOperation(delete); + DeleteResult deleteResult = new DeleteResult( + plan.versionOfDeletion, + delete.primaryTerm(), + delete.seqNo(), + plan.currentlyDeleted == false + ); + addDeleteOperationToTranslog(delete, deleteResult); + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); + return deleteResult; + } + } + + private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException { + if (deleteResult.getResultType() == Result.Type.SUCCESS) { + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); + } + } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { assert operation.origin() == Operation.Origin.PRIMARY : operation; assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; @@ -2286,6 +2312,12 @@ public SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = externalReaderManager.internalReaderManager.acquire(); + // This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment + // replication is enabled + if (engineConfig.isReadOnly()) { + return ((StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate()) + .getSegmentInfos(); + } return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index a0f9f9c5d7a5d..9da5d99f7a719 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -39,9 +39,11 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import java.io.IOException; @@ -98,8 +100,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); - reader = OpenSearchDirectoryReader.wrap(innerReader, referenceToRefresh.shardId()); + final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper( + innerReader, + Lucene.SOFT_DELETES_FIELD + ); + reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader); + } return reader; } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index cd04cb9a7a54c..aba46d16344c7 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } + @Override + public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException { + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index abe0471d5251c..0cb4c528f460b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1141,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation( + "]"; ensureWriteAllowed(origin); final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); + if (origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()) { + return getEngine().addDeleteOperationToTranslog(delete); + } return delete(engine, delete); } From 1670ba1d6f4b3dd05cd4d5970dfadac92bd78f8a Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 11 Apr 2022 15:18:44 +0000 Subject: [PATCH 2/7] Updating comments Signed-off-by: Rishikesh1159 --- .../java/org/opensearch/index/engine/InternalEngine.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 4690771f5f181..20dc03684d5ad 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -718,8 +718,8 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas. - // We should always wrap replicas with a SoftDeletesDirectoryReaderWrapper as we use soft deletes when segment replication is on for - // deletions + /* We should always wrap replicas with a SoftDeletesDirectoryReaderWrapper as we use soft deletes when segment replication is on for + deletions */ if (engineConfig.isReadOnly()) { return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } @@ -2312,8 +2312,8 @@ public SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = externalReaderManager.internalReaderManager.acquire(); - // This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment - // replication is enabled + /* This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment + replication is enabled */ if (engineConfig.isReadOnly()) { return ((StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate()) .getSegmentInfos(); From abced8c1018703c6640496c1b7806d87d139bdbf Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 12 Apr 2022 19:59:51 +0000 Subject: [PATCH 3/7] Updating comment and small refactoring Signed-off-by: Rishikesh1159 --- .idea/vcs.xml | 30 +++++++++---------- .../replication/SegmentReplicationIT.java | 9 +----- .../index/engine/InternalEngine.java | 5 ++-- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 48557884a8893..3b802c80a7487 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,20 @@ - - - + + + - + \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 286fb1ebae74d..7fd80340ec7bd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -234,14 +234,7 @@ public void testDeleteOperations() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); - createIndex( - INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) - .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) - .build() - ); + createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 20dc03684d5ad..97c02256e60ef 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -718,12 +718,11 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas. - /* We should always wrap replicas with a SoftDeletesDirectoryReaderWrapper as we use soft deletes when segment replication is on for - deletions */ + // We should always wrap the DirectoryReader used on replicas with SoftDeletesDirectoryReaderWrapper so that we filter out soft deletes if (engineConfig.isReadOnly()) { return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } - return DirectoryReader.open(indexWriter, true, true); + return DirectoryReader.open(indexWriter); } @Override From 9da9d293bc930c62193b0db995a1f431a91fb0a0 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 12 Apr 2022 20:09:53 +0000 Subject: [PATCH 4/7] Spotless check apply Signed-off-by: Rishikesh1159 --- .../main/java/org/opensearch/index/engine/InternalEngine.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 97c02256e60ef..222acdb926c3b 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -718,7 +718,8 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas. - // We should always wrap the DirectoryReader used on replicas with SoftDeletesDirectoryReaderWrapper so that we filter out soft deletes + // We should always wrap the DirectoryReader used on replicas with SoftDeletesDirectoryReaderWrapper so that we filter out soft + // deletes if (engineConfig.isReadOnly()) { return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } From 5a0f68b567b19b45472d1c163bead07e092b3871 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 13 Apr 2022 00:37:41 +0000 Subject: [PATCH 5/7] Fixing Indentation in vcs.xml file Signed-off-by: Rishikesh1159 --- .idea/vcs.xml | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 3b802c80a7487..8d0785935f4ee 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,20 @@ - - - - - - - \ No newline at end of file + + + + + + + From 9948fa0a7c4cdfa8dda72f1750f825e06c8ce7a6 Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham Date: Tue, 12 Apr 2022 21:55:21 -0700 Subject: [PATCH 6/7] Fixing indentation in vcs.xml Signed-off-by: Rishikesh Pasham --- .idea/vcs.xml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 8d0785935f4ee..adb10ac5edbc6 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -14,7 +14,7 @@ - - - - + + + + \ No newline at end of file From e7e3c316f568e345af6d45f93b2cb689efa67741 Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham Date: Tue, 12 Apr 2022 21:59:44 -0700 Subject: [PATCH 7/7] Fixing new line indentation in vcs.xml Signed-off-by: Rishikesh Pasham --- .idea/vcs.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index adb10ac5edbc6..48557884a8893 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -17,4 +17,4 @@ - \ No newline at end of file +