Skip to content

Commit

Permalink
Prevent Lucene indexing operations on replicas, gated by an index set…
Browse files Browse the repository at this point in the history
…ting (#1935)

* Prevent Lucene indexing operations on replica

This change prevents Lucene indexing operations from occurring on replicas, while still writing to the replica translog. Note that the change only affects indexing operations. The logic in IndexShard has been changed to only write to the translog when applying the indexing operation to a replica shard. To allow this, a new addIndexOperationToTranslog API is exposed on the Engine class, which InternalEngine implements using existing translog logic. This code change also refactors the existing indexing logic in InternalEngine into separate internal methods - one that parses the document and prepares the Index object, and another for error handling.

Signed-off-by: Kartik Ganesh <[email protected]>

* Added index setting for segment replication

For now, this is a boolean final value that simply turns segment replication on/off. The logic in IndexShard has been updated to branch based on this value.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Jan 25, 2022
1 parent 0f31d62 commit 4822c28
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.FINAL_PIPELINE,
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,16 @@ public final class IndexSettings {
Setting.Property.IndexScope
);

/**
* Used to specify if the index should use segment replication. If false, document replication is used.
*/
public static final Setting<Boolean> INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting(
"index.replication.segment_replication",
false,
Property.IndexScope,
Property.Final
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ public Condition newCondition() {
*/
public abstract IndexResult index(Index index) throws IOException;

public abstract IndexResult addIndexOperationToTranslog(Index index) throws IOException;

/**
* Perform document delete operation on the engine
* @param delete operation to perform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,23 +1068,7 @@ public IndexResult index(Index index) throws IOException {
}
}
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
index.primaryTerm(),
index.origin(),
index.startTime(),
indexResult.getFailure().toString()
);
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
indexResult.setTranslogLocation(location);
addIndexOperationToTranslog(index, indexResult);
}
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
Expand Down Expand Up @@ -1119,6 +1103,42 @@ public IndexResult index(Index index) throws IOException {
}
}

@Override
public Engine.IndexResult addIndexOperationToTranslog(Index index) throws IOException {
IndexingStrategy plan = indexingStrategyForOperation(index);
/**
* Matches the logic in {@link #indexIntoLucene(Index, IndexingStrategy)}
*/
IndexResult indexResult = new IndexResult(
plan.versionForIndexing,
index.primaryTerm(),
index.seqNo(),
plan.currentNotFoundOrDeleted
);
addIndexOperationToTranslog(index, indexResult);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
}

private void addIndexOperationToTranslog(Index index, IndexResult indexResult) throws IOException {
Translog.Location location = null;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
index.primaryTerm(),
index.origin(),
index.startTime(),
indexResult.getFailure().toString()
);
location = innerNoOp(noOp).getTranslogLocation();
}
indexResult.setTranslogLocation(location);
}

protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
assert assertNonPrimaryOrigin(index);
// needs to maintain the auto_id timestamp in case this replica becomes primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ public IndexResult index(Index index) {
throw new UnsupportedOperationException("indexing is not supported on a read-only engine");
}

@Override
public IndexResult addIndexOperationToTranslog(Index index) throws IOException {
assert false : "this should not be called";
throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine");
}

@Override
public DeleteResult delete(Delete delete) {
assert false : "this should not be called";
Expand Down
135 changes: 96 additions & 39 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -835,21 +835,104 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
return applyIndexOperation(
getEngine(),
Boolean isSegRepEnabled = indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING);
if (isSegRepEnabled != null && isSegRepEnabled) {
Engine.Index index;
try {
index = parseSourceAndPrepareIndex(
seqNo,
opPrimaryTerm,
version,
null,
UNASSIGNED_SEQ_NO,
0,
autoGeneratedTimeStamp,
isRetry,
Engine.Operation.Origin.REPLICA,
sourceToParse
);
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
return new Engine.IndexResult(update);
}
} catch (Exception e) {
return handleIndexFailure(seqNo, opPrimaryTerm, version, e);
}
return getEngine().addIndexOperationToTranslog(index);
} else {
return applyIndexOperation(
getEngine(),
seqNo,
opPrimaryTerm,
version,
null,
UNASSIGNED_SEQ_NO,
0,
autoGeneratedTimeStamp,
isRetry,
Engine.Operation.Origin.REPLICA,
sourceToParse
);
}
}

private Engine.Index parseSourceAndPrepareIndex(
long seqNo,
long opPrimaryTerm,
long version,
@Nullable VersionType versionType,
long ifSeqNo,
long ifPrimaryTerm,
long autoGeneratedTimeStamp,
boolean isRetry,
Engine.Operation.Origin origin,
SourceToParse sourceToParse
) throws Exception {
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
+ opPrimaryTerm
+ " ] > shard term ["
+ getOperationPrimaryTerm()
+ "]";
ensureWriteAllowed(origin);
Engine.Index operation;
final String resolvedType = mapperService.resolveDocumentType(sourceToParse.type());
final SourceToParse sourceWithResolvedType;
if (resolvedType.equals(sourceToParse.type())) {
sourceWithResolvedType = sourceToParse;
} else {
sourceWithResolvedType = new SourceToParse(
sourceToParse.index(),
resolvedType,
sourceToParse.id(),
sourceToParse.source(),
sourceToParse.getXContentType(),
sourceToParse.routing()
);
}
return prepareIndex(
docMapper(resolvedType),
sourceWithResolvedType,
seqNo,
opPrimaryTerm,
version,
null,
UNASSIGNED_SEQ_NO,
0,
versionType,
origin,
autoGeneratedTimeStamp,
isRetry,
Engine.Operation.Origin.REPLICA,
sourceToParse
ifSeqNo,
ifPrimaryTerm
);
}

private Engine.IndexResult handleIndexFailure(long seqNo, long opPrimaryTerm, long version, Exception e) {
// We treat any exception during parsing and or mapping update as a document level failure
// with the exception side effects of closing the shard. Since we don't have the shard, we
// can not raise an exception that may block any replication of previous operations to the
// replicas
verifyNotClosed(e);
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
}

private Engine.IndexResult applyIndexOperation(
Engine engine,
long seqNo,
Expand All @@ -863,52 +946,26 @@ private Engine.IndexResult applyIndexOperation(
Engine.Operation.Origin origin,
SourceToParse sourceToParse
) throws IOException {
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
+ opPrimaryTerm
+ " ] > shard term ["
+ getOperationPrimaryTerm()
+ "]";
ensureWriteAllowed(origin);
Engine.Index operation;
try {
final String resolvedType = mapperService.resolveDocumentType(sourceToParse.type());
final SourceToParse sourceWithResolvedType;
if (resolvedType.equals(sourceToParse.type())) {
sourceWithResolvedType = sourceToParse;
} else {
sourceWithResolvedType = new SourceToParse(
sourceToParse.index(),
resolvedType,
sourceToParse.id(),
sourceToParse.source(),
sourceToParse.getXContentType(),
sourceToParse.routing()
);
}
operation = prepareIndex(
docMapper(resolvedType),
sourceWithResolvedType,
operation = parseSourceAndPrepareIndex(
seqNo,
opPrimaryTerm,
version,
versionType,
origin,
ifSeqNo,
ifPrimaryTerm,
autoGeneratedTimeStamp,
isRetry,
ifSeqNo,
ifPrimaryTerm
origin,
sourceToParse
);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
return new Engine.IndexResult(update);
}
} catch (Exception e) {
// We treat any exception during parsing and or mapping update as a document level failure
// with the exception side effects of closing the shard. Since we don't have the shard, we
// can not raise an exception that may block any replication of previous operations to the
// replicas
verifyNotClosed(e);
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
return handleIndexFailure(seqNo, opPrimaryTerm, version, e);
}

return index(engine, operation);
Expand Down

0 comments on commit 4822c28

Please sign in to comment.