Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica #4424

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,23 @@
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -60,6 +66,11 @@ public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Override
public Settings indexSettings() {
return Settings.builder()
Expand Down Expand Up @@ -313,6 +324,65 @@ public void testReplicationAfterForceMerge() throws Exception {
}
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
primaryNode
);
final IndexShard primaryShard = getIndexShard(primaryNode);

CountDownLatch latch = new CountDownLatch(1);

MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
FileChunkRequest req = (FileChunkRequest) request;
logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if (req.name().endsWith("cfs") && req.lastChunk()) {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
connection.sendRequest(requestId, action, request, options);
}
);

final int docCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);

flush(INDEX_NAME);
}
segmentReplicationSourceService.beforeIndexShardClosed(primaryShard.shardId(), primaryShard, indexSettings());
latch.countDown();
assertDocCounts(docCount, primaryNode);
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ void writeFileChunk(
int totalTranslogOps,
ActionListener<Void> listener
);

default void cancel() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF

/**
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* node's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas
* with the list of required files.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,9 @@ public void writeFileChunk(
reader
);
}

@Override
public void cancel() {
retryableTransportClient.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class SegmentReplicationSourceHandler {
private final DiscoveryNode targetNode;
private final String allocationId;

private final FileChunkWriter writer;

/**
* Constructor.
*
Expand Down Expand Up @@ -96,6 +98,7 @@ class SegmentReplicationSourceHandler {
);
this.allocationId = allocationId;
this.copyState = copyState;
this.writer = writer;
}

/**
Expand Down Expand Up @@ -186,9 +189,10 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
}

/**
* Cancels the recovery and interrupts all eligible threads.
* Cancels the replication and interrupts all eligible threads.
*/
public void cancel(String reason) {
writer.cancel();
cancellableThreads.cancel(reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;

public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase {

Expand Down Expand Up @@ -245,6 +247,7 @@ public void onFailure(Exception e) {
}
});
latch.await(2, TimeUnit.SECONDS);
verify(chunkWriter, times(1)).cancel();
assertEquals("listener should have resolved with failure", 0, latch.getCount());
}
}