From e4ea9d93c802f9ab876a069c762a1affd645c1b6 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 2 Sep 2022 18:57:39 -0700 Subject: [PATCH] [Segment Replication] Extend FileChunkWriter to allow cancel on transport client (#4386) * [Segment Replication] Extend FileChunkWriter to allow cancel on retryable transport client Signed-off-by: Suraj Singh * Add changelog entry Signed-off-by: Suraj Singh * Address review comments Signed-off-by: Suraj Singh * Integration test Signed-off-by: Suraj Singh Signed-off-by: Suraj Singh --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 72 ++++++++++++++++++- .../indices/recovery/FileChunkWriter.java | 2 + .../OngoingSegmentReplications.java | 2 +- .../RemoteSegmentFileChunkWriter.java | 5 ++ .../SegmentReplicationSourceHandler.java | 6 +- .../SegmentReplicationSourceHandlerTests.java | 3 + 7 files changed, 88 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3c58bae3fd94..e4092080bc5fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) - [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365)) +- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index b843b99c097a8..beb4ef0e3dcee 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -31,17 +31,23 @@ import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Collection; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -61,6 +67,11 @@ public static void assumeFeatureFlag() { assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); } + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + @Override public Settings indexSettings() { return Settings.builder() @@ -314,6 +325,65 @@ public void testReplicationAfterForceMerge() throws Exception { } } + public void testCancellation() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureYellow(INDEX_NAME); + + final String replicaNode = internalCluster().startNode(); + + final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( + SegmentReplicationSourceService.class, + primaryNode + ); + final IndexShard primaryShard = getIndexShard(primaryNode); + + CountDownLatch latch = new CountDownLatch(1); + + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + FileChunkRequest req = (FileChunkRequest) request; + logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk()); + if (req.name().endsWith("cfs") && req.lastChunk()) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + + final int docCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + + flush(INDEX_NAME); + } + segmentReplicationSourceService.beforeIndexShardClosed(primaryShard.shardId(), primaryShard, indexSettings()); + latch.countDown(); + assertDocCounts(docCount, primaryNode); + } + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java index cb43af3b82e09..f1cc7b8dd1d89 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java @@ -28,4 +28,6 @@ void writeFileChunk( int totalTranslogOps, ActionListener listener ); + + default void cancel() {} } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 828aa29192fe3..1a97d334df58f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -126,7 +126,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener