diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index cf80120bc44a3..f613c67524cab 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -229,4 +229,45 @@ public void onFailure(Exception e) { } }); } + + public void testDelOps()throws Exception{ + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build() + ); + ensureGreen(INDEX_NAME); + client().prepareIndex(INDEX_NAME) + .setId("1") + .setSource("foo", "bar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareIndex(INDEX_NAME) + .setId("2") + .setSource("fooo", "baar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + + // Now delete with blockUntilRefresh + client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index b3af769dc0ccf..ee1b04b7480da 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -369,6 +369,8 @@ public Condition newCondition() { */ public abstract DeleteResult delete(Delete delete) throws IOException; + public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException; + public abstract NoOpResult noOp(NoOp noOp) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 16fa8a80ec3e0..fa32c81639e64 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -704,7 +704,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. if (engineConfig.isReadOnly()) { - return (SoftDeletesDirectoryReaderWrapper)DirectoryReader.open(store.directory()); + return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } return DirectoryReader.open(indexWriter); } @@ -1510,8 +1510,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); - deleteResult.setTranslogLocation(location); + addDeleteOperationToTranslog(delete, deleteResult); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); if (deleteResult.getTranslogLocation() == null) { @@ -1535,6 +1534,30 @@ public DeleteResult delete(Delete delete) throws IOException { return deleteResult; } + @Override + public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) { + DeletionStrategy plan = deletionStrategyForOperation(delete); + DeleteResult deleteResult = new DeleteResult( + plan.versionOfDeletion, + delete.primaryTerm(), + delete.seqNo(), + plan.currentlyDeleted == false + ); + addDeleteOperationToTranslog(delete, deleteResult); + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); + return deleteResult; + } + } + + private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException{ + if(deleteResult.getResultType() == Result.Type.SUCCESS){ + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); + } + } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { assert operation.origin() == Operation.Origin.PRIMARY : operation; assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index dc9ed0219eec1..8e6ac4ec66486 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -92,6 +92,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re // If not using NRT repl. if (currentInfos == null) { reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); + if (reader != null) { + logger.info("Num docs primary {}", reader.getDelegate().numDocs()); + } } else { // Open a new reader, sharing any common segment readers with the old one: DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index cd04cb9a7a54c..b2aebb41acb69 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException("deletes are not supported on a read-only engine"); } + @Override + public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{ + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public NoOpResult noOp(NoOp noOp) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index abe0471d5251c..174d9f6621ae2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1141,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation( + "]"; ensureWriteAllowed(origin); final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); + if(origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()){ + return getEngine().addDeleteOperationToTranslog(delete); + } return delete(engine, delete); }