From 18fb405d0c3959de0aef1aceaf5523e481e24a1d Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Tue, 8 Feb 2022 12:00:09 -0800 Subject: [PATCH] =?UTF-8?q?Changes=20to=20enable=20=E2=80=9CshouldForward?= =?UTF-8?q?=E2=80=9D=20gating?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change adds a shouldForward operation to the ReplicationOperation.PrimaryResult interface. This is used by the logic in ReplicationOperation to gate forwarding of the request to replicas. The TransportReplicationAction.PrimaryResult subclass incorporates a member variable to store this value. The value is eventually populated by the WritePrimaryResult constructor, based on a new IndexShard API. This API internally depends on the segment-replication setting. Meanwhile, other implementing classes just use a defuault hard-coded implementation to maintain backwards compatibility. Signed-off-by: Kartik Ganesh --- .../action/bulk/TransportShardBulkAction.java | 1 + .../replication/ReplicationOperation.java | 4 +++- .../TransportReplicationAction.java | 18 +++++++++++++++++- .../replication/TransportWriteAction.java | 14 +++++++++++++- .../org/opensearch/index/shard/IndexShard.java | 16 ++++++++++++++-- ...sportVerifyShardBeforeCloseActionTests.java | 8 ++++++++ .../replication/ReplicationOperationTests.java | 8 ++++++++ ...penSearchIndexLevelReplicationTestCase.java | 8 ++++++++ 8 files changed, 72 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 1ce4a346e5dc3..bc0c49143fe4b 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -269,6 +269,7 @@ private void finishRequest() { context.getLocationToSync(), null, context.getPrimary(), + context.getPrimary().shouldForward(), logger ) ); diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java index 68c5416f3603e..b1097db78eca2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java @@ -151,7 +151,7 @@ public void execute() throws Exception { private void handlePrimaryResult(final PrimaryResultT primaryResult) { this.primaryResult = primaryResult; final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); - if (replicaRequest != null) { + if (replicaRequest != null && primaryResult.shouldForward()) { if (logger.isTraceEnabled()) { logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request); } @@ -621,5 +621,7 @@ public interface PrimaryResult> { * @param listener calllback that is invoked after post replication actions have completed * */ void runPostReplicationActions(ActionListener listener); + + boolean shouldForward(); } } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 229482a0f76b2..ebea21e064485 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -558,12 +558,18 @@ public static class PrimaryResultfinalResponseIfSuccessful or finalFailure to be not-null */ - public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) { + public PrimaryResult( + ReplicaRequest replicaRequest, + Response finalResponseIfSuccessful, + Exception finalFailure, + boolean shouldForward + ) { assert finalFailure != null ^ finalResponseIfSuccessful != null : "either a response or a failure has to be not null, " + "found [" + finalFailure @@ -573,6 +579,11 @@ public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSucc this.replicaRequest = replicaRequest; this.finalResponseIfSuccessful = finalResponseIfSuccessful; this.finalFailure = finalFailure; + this.shouldForward = shouldForward; + } + + public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) { + this(replicaRequest, finalResponseIfSuccessful, finalFailure, true); } public PrimaryResult(ReplicaRequest replicaRequest, Response replicationResponse) { @@ -599,6 +610,11 @@ public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } } + + @Override + public boolean shouldForward() { + return shouldForward; + } } public static class ReplicaResult { diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index c9fb959306b9c..3726589ad9f87 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -279,9 +279,10 @@ public WritePrimaryResult( @Nullable Location location, @Nullable Exception operationFailure, IndexShard primary, + boolean shouldForward, Logger logger ) { - super(request, finalResponse, operationFailure); + super(request, finalResponse, operationFailure, shouldForward); this.location = location; this.primary = primary; this.logger = logger; @@ -293,6 +294,17 @@ public WritePrimaryResult( + "] failure"; } + public WritePrimaryResult( + ReplicaRequest request, + @Nullable Response finalResponse, + @Nullable Location location, + @Nullable Exception operationFailure, + IndexShard primary, + Logger logger + ) { + this(request, finalResponse, location, operationFailure, primary, true, logger); + } + @Override public void runPostReplicationActions(ActionListener listener) { if (finalFailure != null) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0ad3c2cb0478f..14f4d165c9095 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -835,8 +835,7 @@ public Engine.IndexResult applyIndexOperationOnReplica( boolean isRetry, SourceToParse sourceToParse ) throws IOException { - Boolean isSegRepEnabled = indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING); - if (isSegRepEnabled != null && isSegRepEnabled) { + if (isSegmentReplicationEnabled()) { Engine.Index index; try { index = parseSourceAndPrepareIndex( @@ -4096,4 +4095,17 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException { RetentionLeaseSyncer getRetentionLeaseSyncer() { return retentionLeaseSyncer; } + + /** + * Controls whether requests should be forwarded from the + * primary to the replica. + */ + public boolean shouldForward() { + // Eventually this will also incorporate the presence of pluggable translog + return !isSegmentReplicationEnabled(); + } + + private boolean isSegmentReplicationEnabled() { + return indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING); + } } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 43d5a85094a36..63f549d290d7e 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -415,6 +415,14 @@ public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } + /** + * Default implementation. + */ + @Override + public boolean shouldForward() { + return true; + } + public ReplicationResponse.ShardInfo getShardInfo() { return shardInfo.get(); } diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index 8a4cdfc953bf8..dbbfd8b6f847d 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -669,6 +669,14 @@ public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } + /** + * Default implementation. + */ + @Override + public boolean shouldForward() { + return true; + } + public ShardInfo getShardInfo() { return shardInfo; } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 97443f6cba811..68e7f4faf08d5 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -869,6 +869,14 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { public void runPostReplicationActions(ActionListener listener) { listener.onResponse(null); } + + /** + * Default implementation + */ + @Override + public boolean shouldForward() { + return true; + } } }