Skip to content

Commit

Permalink
Delete Operations for Replicas when Segment Replication is toggled
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Apr 11, 2022
1 parent 2254176 commit df7a777
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,45 @@ public void onFailure(Exception e) {
}
});
}

public void testDelOps()throws Exception{
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();

createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build()
);
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
.setSource("foo", "bar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("fooo", "baar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// Now delete with blockUntilRefresh
client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
}
}
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ public Condition newCondition() {
*/
public abstract DeleteResult delete(Delete delete) throws IOException;

public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException;

public abstract NoOpResult noOp(NoOp noOp) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
if (engineConfig.isReadOnly()) {
return (SoftDeletesDirectoryReaderWrapper)DirectoryReader.open(store.directory());
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD);
}
return DirectoryReader.open(indexWriter);
}
Expand Down Expand Up @@ -1510,8 +1510,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
addDeleteOperationToTranslog(delete, deleteResult);
}
localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
if (deleteResult.getTranslogLocation() == null) {
Expand All @@ -1535,6 +1534,30 @@ public DeleteResult delete(Delete delete) throws IOException {
return deleteResult;
}

@Override
public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{
try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) {
DeletionStrategy plan = deletionStrategyForOperation(delete);
DeleteResult deleteResult = new DeleteResult(
plan.versionOfDeletion,
delete.primaryTerm(),
delete.seqNo(),
plan.currentlyDeleted == false
);
addDeleteOperationToTranslog(delete, deleteResult);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
return deleteResult;
}
}

private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException{
if(deleteResult.getResultType() == Result.Type.SUCCESS){
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
}

private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) {
assert operation.origin() == Operation.Origin.PRIMARY : operation;
assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
// If not using NRT repl.
if (currentInfos == null) {
reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
if (reader != null) {
logger.info("Num docs primary {}", reader.getDelegate().numDocs());
}
} else {
// Open a new reader, sharing any common segment readers with the old one:
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException("deletes are not supported on a read-only engine");
}

@Override
public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException{
assert false : "this should not be called";
throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine");
}

@Override
public NoOpResult noOp(NoOp noOp) {
assert false : "this should not be called";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation(
+ "]";
ensureWriteAllowed(origin);
final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
if(origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()){
return getEngine().addDeleteOperationToTranslog(delete);
}
return delete(engine, delete);
}

Expand Down

0 comments on commit df7a777

Please sign in to comment.