Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Operations on Replicas when Segment Replication is enabled #2839

Merged
merged 8 commits into from
Apr 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,34 @@ public void onFailure(Exception e) {
}
});
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("fooo", "baar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// Now delete with blockUntilRefresh
client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ public Condition newCondition() {
*/
public abstract DeleteResult delete(Delete delete) throws IOException;

public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException;

public abstract NoOpResult noOp(NoOp noOp) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
Expand Down Expand Up @@ -717,9 +718,11 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}

private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
// for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas.
// We should always wrap the DirectoryReader used on replicas with SoftDeletesDirectoryReaderWrapper so that we filter out soft
// deletes
if (engineConfig.isReadOnly()) {
return DirectoryReader.open(store.directory());
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD);
}
return DirectoryReader.open(indexWriter);
}
Expand Down Expand Up @@ -1525,8 +1528,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
addDeleteOperationToTranslog(delete, deleteResult);
}
localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
if (deleteResult.getTranslogLocation() == null) {
Expand All @@ -1550,6 +1552,30 @@ public DeleteResult delete(Delete delete) throws IOException {
return deleteResult;
}

@Override
public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException {
try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) {
DeletionStrategy plan = deletionStrategyForOperation(delete);
DeleteResult deleteResult = new DeleteResult(
plan.versionOfDeletion,
delete.primaryTerm(),
delete.seqNo(),
plan.currentlyDeleted == false
);
addDeleteOperationToTranslog(delete, deleteResult);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
return deleteResult;
}
}

private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException {
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
}

private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) {
assert operation.origin() == Operation.Origin.PRIMARY : operation;
assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation;
Expand Down Expand Up @@ -2287,6 +2313,12 @@ public SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = externalReaderManager.internalReaderManager.acquire();
/* This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment
replication is enabled */
if (engineConfig.isReadOnly()) {
return ((StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate())
.getSegmentInfos();
}
return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos();
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;

import java.io.IOException;
Expand Down Expand Up @@ -98,8 +100,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
} else {
// Open a new reader, sharing any common segment readers with the old one:
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
reader = OpenSearchDirectoryReader.wrap(innerReader, referenceToRefresh.shardId());
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Lucene.SOFT_DELETES_FIELD
);
reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader);

}
return reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException("deletes are not supported on a read-only engine");
}

@Override
public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException {
assert false : "this should not be called";
throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine");
}

@Override
public NoOpResult noOp(NoOp noOp) {
assert false : "this should not be called";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation(
+ "]";
ensureWriteAllowed(origin);
final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
if (origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()) {
return getEngine().addDeleteOperationToTranslog(delete);
}
return delete(engine, delete);
}

Expand Down