Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Operations on Replicas when Segment Replication is enabled #2839

Merged
merged 8 commits into from
Apr 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,41 @@ public void onFailure(Exception e) {
}
});
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();

createIndex(
INDEX_NAME,
Settings.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this on the last PR. You shouldn't need to set these settings here because we override the indexSettings() method at the top of this file.

        createIndex(INDEX_NAME);

Will create the index with the default settings we override.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry I too missed it

.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build()
);
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("fooo", "baar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// Now delete with blockUntilRefresh
client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ public Condition newCondition() {
*/
public abstract DeleteResult delete(Delete delete) throws IOException;

public abstract DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException;

public abstract NoOpResult noOp(NoOp noOp) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
Expand Down Expand Up @@ -716,11 +717,13 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}

private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
// for segment replication: replicas should create the reader from store and, we don't want an open IW on replicas.
/* We should always wrap replicas with a SoftDeletesDirectoryReaderWrapper as we use soft deletes when segment replication is on for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - "We should always wrap the DirectoryReader used on replicas with SoftDeletesDirectoryReaderWrapper so that we filter out soft deletes.

deletions */
if (engineConfig.isReadOnly()) {
return DirectoryReader.open(store.directory());
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD);
}
return DirectoryReader.open(indexWriter);
return DirectoryReader.open(indexWriter, true, true);
Copy link
Member

@mch2 mch2 Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this required? does your test pass without this like with the first PR?

I forgot this also impacts the reader with segrep off - if this isn't required with segrep on then we should remove it. Otherwise it needs to be gated by reading the setting. It is changing how the primary behaves with segrep off.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't required with segrep on. I think we remove it

}

@Override
Expand Down Expand Up @@ -1524,8 +1527,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
addDeleteOperationToTranslog(delete, deleteResult);
}
localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
if (deleteResult.getTranslogLocation() == null) {
Expand All @@ -1549,6 +1551,30 @@ public DeleteResult delete(Delete delete) throws IOException {
return deleteResult;
}

@Override
public Engine.DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException {
try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) {
DeletionStrategy plan = deletionStrategyForOperation(delete);
DeleteResult deleteResult = new DeleteResult(
plan.versionOfDeletion,
delete.primaryTerm(),
delete.seqNo(),
plan.currentlyDeleted == false
);
addDeleteOperationToTranslog(delete, deleteResult);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
return deleteResult;
}
}

private void addDeleteOperationToTranslog(Delete delete, DeleteResult deleteResult) throws IOException {
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
}

private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) {
assert operation.origin() == Operation.Origin.PRIMARY : operation;
assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation;
Expand Down Expand Up @@ -2286,6 +2312,12 @@ public SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = externalReaderManager.internalReaderManager.acquire();
/* This is safe, as we always wrap Standard reader with a SoftDeletesDirectoryReaderWrapper for replicas when segment
replication is enabled */
if (engineConfig.isReadOnly()) {
return ((StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) reader.getDelegate()).getDelegate())
.getSegmentInfos();
}
return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos();
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;

import java.io.IOException;
Expand Down Expand Up @@ -98,8 +100,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
} else {
// Open a new reader, sharing any common segment readers with the old one:
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
reader = OpenSearchDirectoryReader.wrap(innerReader, referenceToRefresh.shardId());
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Lucene.SOFT_DELETES_FIELD
);
reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader);

}
return reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException("deletes are not supported on a read-only engine");
}

@Override
public DeleteResult addDeleteOperationToTranslog(Delete delete) throws IOException {
assert false : "this should not be called";
throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine");
}

@Override
public NoOpResult noOp(NoOp noOp) {
assert false : "this should not be called";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,9 @@ private Engine.DeleteResult applyDeleteOperation(
+ "]";
ensureWriteAllowed(origin);
final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
if (origin.equals(Engine.Operation.Origin.REPLICA) && indexSettings.isSegrepEnabled()) {
return getEngine().addDeleteOperationToTranslog(delete);
}
return delete(engine, delete);
}

Expand Down