diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index cf80120bc44a3..7fd80340ec7bd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -229,4 +229,34 @@ public void onFailure(Exception e) { } }); } + + public void testDeleteOperations() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareIndex(INDEX_NAME) + .setId("2") + .setSource("fooo", "baar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + + // Now delete with blockUntilRefresh + client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index b3af769dc0ccf..ee1b04b7480da 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -369,6 +369,8 @@ public Condition newCondition() { */ public abstract DeleteResult delete(Delete delete) throws IOException; + public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException; + public abstract NoOpResult noOp(NoOp noOp) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fe89e230d0635..2bd5998def810 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -50,6 +50,7 @@ import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.index.Term; import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy; import org.apache.lucene.search.BooleanClause; @@ -717,9 +718,11 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } private DirectoryReader getDirectoryReader() throws IOException { - // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. + // for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas. + // We should always wrap the DirectoryReader used on replicas with SoftDeletesDirectoryReaderWrapper so that we filter out soft + // deletes if (engineConfig.isReadOnly()) { - return DirectoryReader.open(store.directory()); + return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } return DirectoryReader.open(indexWriter); } @@ -1525,8 +1528,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); - deleteResult.setTranslogLocation(location); + addDeleteOperationToTranslog(delete, deleteResult); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); if (deleteResult.getTranslogLocation() == null) { @@ -1550,6 +1552,30 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } + @Override + public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException { + try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) { + DeletionStrategy plan = deletionStrategyForOperation(delete); + DeleteResult deleteResult = new DeleteResult( + plan.versionOfDeletion, + delete.primaryTerm(), + delete.seqNo(), + plan.currentlyDeleted == false + ); + addDeleteOperationToTranslog(delete, deleteResult); + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); + return deleteResult; + } + } + + private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException { + if (deleteResult.getResultType() == Result.Type.SUCCESS) { + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); + } + } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { assert operation.origin() == Operation.Origin.PRIMARY : operation; assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; @@ -2287,6 +2313,12 @@ public SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = externalReaderManager.internalReaderManager.acquire(); + /* This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment + replication is enabled */ + if (engineConfig.isReadOnly()) { + return ((StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate()) + .getSegmentInfos(); + } return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index a0f9f9c5d7a5d..9da5d99f7a719 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -39,9 +39,11 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import java.io.IOException; @@ -98,8 +100,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); - reader = OpenSearchDirectoryReader.wrap(innerReader, referenceToRefresh.shardId()); + final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper( + innerReader, + Lucene.SOFT_DELETES_FIELD + ); + reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader); + } return reader; } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index cd04cb9a7a54c..aba46d16344c7 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } + @Override + public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException { + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index abe0471d5251c..0cb4c528f460b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1141,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation( + "]"; ensureWriteAllowed(origin); final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); + if (origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()) { + return getEngine().addDeleteOperationToTranslog(delete); + } return delete(engine, delete); }