Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to enable “shouldForward” gating #2084

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private void finishRequest() {
context.getLocationToSync(),
null,
context.getPrimary(),
context.getPrimary().shouldForward(),
logger
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void execute() throws Exception {
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (replicaRequest != null && primaryResult.shouldForward()) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
}
Expand Down Expand Up @@ -621,5 +621,7 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
* @param listener calllback that is invoked after post replication actions have completed
* */
void runPostReplicationActions(ActionListener<Void> listener);

boolean shouldForward();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,18 @@ public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<Repl
protected final ReplicaRequest replicaRequest;
public final Response finalResponseIfSuccessful;
public final Exception finalFailure;
public final boolean shouldForward;

/**
* Result of executing a primary operation
* expects <code>finalResponseIfSuccessful</code> or <code>finalFailure</code> to be not-null
*/
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) {
public PrimaryResult(
ReplicaRequest replicaRequest,
Response finalResponseIfSuccessful,
Exception finalFailure,
boolean shouldForward
) {
assert finalFailure != null ^ finalResponseIfSuccessful != null : "either a response or a failure has to be not null, "
+ "found ["
+ finalFailure
Expand All @@ -573,6 +579,11 @@ public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSucc
this.replicaRequest = replicaRequest;
this.finalResponseIfSuccessful = finalResponseIfSuccessful;
this.finalFailure = finalFailure;
this.shouldForward = shouldForward;
}

public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) {
this(replicaRequest, finalResponseIfSuccessful, finalFailure, true);
}

public PrimaryResult(ReplicaRequest replicaRequest, Response replicationResponse) {
Expand All @@ -599,6 +610,11 @@ public void runPostReplicationActions(ActionListener<Void> listener) {
listener.onResponse(null);
}
}

@Override
public boolean shouldForward() {
return shouldForward;
}
}

public static class ReplicaResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ public WritePrimaryResult(
@Nullable Location location,
@Nullable Exception operationFailure,
IndexShard primary,
boolean shouldForward,
Logger logger
) {
super(request, finalResponse, operationFailure);
super(request, finalResponse, operationFailure, shouldForward);
this.location = location;
this.primary = primary;
this.logger = logger;
Expand All @@ -293,6 +294,17 @@ public WritePrimaryResult(
+ "] failure";
}

public WritePrimaryResult(
ReplicaRequest request,
@Nullable Response finalResponse,
@Nullable Location location,
@Nullable Exception operationFailure,
IndexShard primary,
Logger logger
) {
this(request, finalResponse, location, operationFailure, primary, true, logger);
}

@Override
public void runPostReplicationActions(ActionListener<Void> listener) {
if (finalFailure != null) {
Expand Down
16 changes: 14 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,7 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
Boolean isSegRepEnabled = indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING);
if (isSegRepEnabled != null && isSegRepEnabled) {
if (isSegmentReplicationEnabled()) {
Engine.Index index;
try {
index = parseSourceAndPrepareIndex(
Expand Down Expand Up @@ -4096,4 +4095,17 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException {
RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

/**
* Controls whether requests should be forwarded from the
* primary to the replica.
*/
public boolean shouldForward() {
// Eventually this will also incorporate the presence of pluggable translog
return !isSegmentReplicationEnabled();
}

private boolean isSegmentReplicationEnabled() {
return indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,14 @@ public void runPostReplicationActions(ActionListener<Void> listener) {
listener.onResponse(null);
}

/**
* Default implementation.
*/
@Override
public boolean shouldForward() {
return true;
}

public ReplicationResponse.ShardInfo getShardInfo() {
return shardInfo.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,14 @@ public void runPostReplicationActions(ActionListener<Void> listener) {
listener.onResponse(null);
}

/**
* Default implementation.
*/
@Override
public boolean shouldForward() {
return true;
}

public ShardInfo getShardInfo() {
return shardInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,14 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
public void runPostReplicationActions(ActionListener<Void> listener) {
listener.onResponse(null);
}

/**
* Default implementation
*/
@Override
public boolean shouldForward() {
return true;
}
}

}
Expand Down