diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 1ce4a346e5dc3..bc0c49143fe4b 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -269,6 +269,7 @@ private void finishRequest() { context.getLocationToSync(), null, context.getPrimary(), + context.getPrimary().shouldForward(), logger ) ); diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java index 68c5416f3603e..b1097db78eca2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java @@ -151,7 +151,7 @@ public void execute() throws Exception { private void handlePrimaryResult(final PrimaryResultT primaryResult) { this.primaryResult = primaryResult; final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); - if (replicaRequest != null) { + if (replicaRequest != null && primaryResult.shouldForward()) { if (logger.isTraceEnabled()) { logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request); } @@ -621,5 +621,7 @@ public interface PrimaryResult> { * @param listener calllback that is invoked after post replication actions have completed * */ void runPostReplicationActions(ActionListener listener); + + boolean shouldForward(); } } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 229482a0f76b2..ebea21e064485 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -558,12 +558,18 @@ public static class PrimaryResultfinalResponseIfSuccessful or finalFailure to be not-null */ - public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) { + public PrimaryResult( + ReplicaRequest replicaRequest, + Response finalResponseIfSuccessful, + Exception finalFailure, + boolean shouldForward + ) { assert finalFailure != null ^ finalResponseIfSuccessful != null : "either a response or a failure has to be not null, " + "found [" + finalFailure @@ -573,6 +579,11 @@ public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSucc this.replicaRequest = replicaRequest; this.finalResponseIfSuccessful = finalResponseIfSuccessful; this.finalFailure = finalFailure; + this.shouldForward = shouldForward; + } + + public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) { + this(replicaRequest, finalResponseIfSuccessful, finalFailure, true); } public PrimaryResult(ReplicaRequest replicaRequest, Response replicationResponse) { @@ -599,6 +610,11 @@ public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } } + + @Override + public boolean shouldForward() { + return shouldForward; + } } public static class ReplicaResult { diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index c9fb959306b9c..3726589ad9f87 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -279,9 +279,10 @@ public WritePrimaryResult( @Nullable Location location, @Nullable Exception operationFailure, IndexShard primary, + boolean shouldForward, Logger logger ) { - super(request, finalResponse, operationFailure); + super(request, finalResponse, operationFailure, shouldForward); this.location = location; this.primary = primary; this.logger = logger; @@ -293,6 +294,17 @@ public WritePrimaryResult( + "] failure"; } + public WritePrimaryResult( + ReplicaRequest request, + @Nullable Response finalResponse, + @Nullable Location location, + @Nullable Exception operationFailure, + IndexShard primary, + Logger logger + ) { + this(request, finalResponse, location, operationFailure, primary, true, logger); + } + @Override public void runPostReplicationActions(ActionListener listener) { if (finalFailure != null) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0ad3c2cb0478f..14f4d165c9095 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -835,8 +835,7 @@ public Engine.IndexResult applyIndexOperationOnReplica( boolean isRetry, SourceToParse sourceToParse ) throws IOException { - Boolean isSegRepEnabled = indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING); - if (isSegRepEnabled != null && isSegRepEnabled) { + if (isSegmentReplicationEnabled()) { Engine.Index index; try { index = parseSourceAndPrepareIndex( @@ -4096,4 +4095,17 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException { RetentionLeaseSyncer getRetentionLeaseSyncer() { return retentionLeaseSyncer; } + + /** + * Controls whether requests should be forwarded from the + * primary to the replica. + */ + public boolean shouldForward() { + // Eventually this will also incorporate the presence of pluggable translog + return !isSegmentReplicationEnabled(); + } + + private boolean isSegmentReplicationEnabled() { + return indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING); + } } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 43d5a85094a36..63f549d290d7e 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -415,6 +415,14 @@ public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } + /** + * Default implementation. + */ + @Override + public boolean shouldForward() { + return true; + } + public ReplicationResponse.ShardInfo getShardInfo() { return shardInfo.get(); } diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index 8a4cdfc953bf8..dbbfd8b6f847d 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -669,6 +669,14 @@ public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } + /** + * Default implementation. + */ + @Override + public boolean shouldForward() { + return true; + } + public ShardInfo getShardInfo() { return shardInfo; } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 97443f6cba811..68e7f4faf08d5 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -869,6 +869,14 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } + + /** + * Default implementation + */ + @Override + public boolean shouldForward() { + return true; + } } }