Skip to content

Commit

Permalink
Segment Replication - Implement segment replication event cancellatio…
Browse files Browse the repository at this point in the history
…n. (#4225) (#4387)

* Segment Replication.  Fix Cancellation of replication events.

This PR updates segment replication paths to correctly cancel replication events on the primary and replica.
In the source service, any ongoing event for a primary that is sending to a replica that shuts down or is promoted as a new primary are cancelled.
In the target service, any ongoing event for a replica that is promoted as a new primary or is fetching from a primary that shuts down.
It wires up SegmentReplicationSourceService as an IndexEventListener so that it can respond to events and cancel any ongoing transfer state.
This change also includes some test cleanup for segment replication to rely on actual components over mocks.

Signed-off-by: Marc Handalian <[email protected]>

Fix to not start/stop SegmentReplicationSourceService as a lifecycle component with feature flag off.

Signed-off-by: Marc Handalian <[email protected]>

Update logic to properly mark SegmentReplicationTarget as cancelled when cancel initiated by primary.

Signed-off-by: Marc Handalian <[email protected]>

Minor updates from self review.

Signed-off-by: Marc Handalian <[email protected]>

* Add missing changelog entry.

Signed-off-by: Marc Handalian <[email protected]>

Signed-off-by: Marc Handalian <[email protected]>
(cherry picked from commit 19d1a2b)
  • Loading branch information
mch2 authored Sep 2, 2022
1 parent 03273b5 commit 1edb733
Show file tree
Hide file tree
Showing 19 changed files with 739 additions and 196 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Fixed
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
- Restore using the class ClusterInfoRequest and ClusterInfoRequestBuilder from package 'org.opensearch.action.support.master.info' for subclasses ([#4324](https://github.com/opensearch-project/OpenSearch/pull/4324))
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
Expand Down Expand Up @@ -152,6 +153,7 @@ public IndicesClusterStateService(
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final SegmentReplicationTargetService segmentReplicationTargetService,
final SegmentReplicationSourceService segmentReplicationSourceService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
Expand All @@ -170,6 +172,7 @@ public IndicesClusterStateService(
threadPool,
checkpointPublisher,
segmentReplicationTargetService,
segmentReplicationSourceService,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
Expand All @@ -191,6 +194,7 @@ public IndicesClusterStateService(
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final SegmentReplicationTargetService segmentReplicationTargetService,
final SegmentReplicationSourceService segmentReplicationSourceService,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
Expand All @@ -211,6 +215,7 @@ public IndicesClusterStateService(
// if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener.
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
indexEventListeners.add(segmentReplicationTargetService);
indexEventListeners.add(segmentReplicationSourceService);
}
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
* @opensearch.internal
*/
class OngoingSegmentReplications {

private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
Expand Down Expand Up @@ -161,14 +160,27 @@ synchronized void cancel(IndexShard shard, String reason) {
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
}

/**
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
*
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(String allocationId, String reason) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");

}

/**
Expand Down Expand Up @@ -243,11 +255,7 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
for (String allocationId : allocationIds) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
cancel(allocationId, reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,10 @@ public void getSegmentFiles(
);
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader);
}

@Override
public void cancel() {
transportClient.cancel();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.opensearch.action.ActionListener;
import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -47,4 +48,9 @@ void getSegmentFiles(
Store store,
ActionListener<GetSegmentFilesResponse> listener
);

/**
* Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}.
*/
default void cancel() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
final Closeable releaseResources = () -> IOUtils.close(resources);
try {
timer.start();
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
final RuntimeException e = new CancellableThreads.ExecutionCancelledException(
"replication was canceled reason [" + reason + "]"
);
if (beforeCancelEx != null) {
e.addSuppressed(beforeCancelEx);
}
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e -> {
assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]");
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
Expand Down Expand Up @@ -153,6 +163,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = segmentFileTransferHandler
.createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
resources.add(transfer);
cancellableThreads.checkForCancel();
transfer.start();

sendFileStep.whenComplete(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -42,7 +43,25 @@
*
* @opensearch.internal
*/
public final class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
public class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {

// Empty Implementation, only required while Segment Replication is under feature flag.
public static final SegmentReplicationSourceService NO_OP = new SegmentReplicationSourceService() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
// NoOp;
}

@Override
public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
// NoOp;
}

@Override
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
// NoOp;
}
};

private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class);
private final RecoverySettings recoverySettings;
Expand All @@ -62,6 +81,14 @@ public static class Actions {

private final OngoingSegmentReplications ongoingSegmentReplications;

// Used only for empty implementation.
private SegmentReplicationSourceService() {
recoverySettings = null;
ongoingSegmentReplications = null;
transportService = null;
indicesService = null;
}

public SegmentReplicationSourceService(
IndicesService indicesService,
TransportService transportService,
Expand Down Expand Up @@ -163,10 +190,25 @@ protected void doClose() throws IOException {

}

/**
*
* Cancels any replications on this node to a replica shard that is about to be closed.
*/
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
ongoingSegmentReplications.cancel(indexShard, "shard is closed");
}
}

/**
* Cancels any replications on this node to a replica that has been promoted as primary.
*/
@Override
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
if (indexShard != null && oldRouting.primary() == false && newRouting.primary()) {
ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public enum Stage {
GET_CHECKPOINT_INFO((byte) 3),
FILE_DIFF((byte) 4),
GET_FILES((byte) 5),
FINALIZE_REPLICATION((byte) 6);
FINALIZE_REPLICATION((byte) 6),
CANCELLED((byte) 7);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -118,6 +119,10 @@ protected void validateAndSetStage(Stage expected, Stage next) {
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
);
}
stopTimersAndSetStage(next);
}

private void stopTimersAndSetStage(Stage next) {
// save the timing data for the current step
stageTimer.stop();
timingData.add(new Tuple<>(stage.name(), stageTimer.time()));
Expand Down Expand Up @@ -155,6 +160,14 @@ public void setStage(Stage stage) {
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
case CANCELLED:
if (this.stage == Stage.DONE) {
throw new IllegalStateException("can't move replication to Cancelled state from Done.");
}
stopTimersAndSetStage(Stage.CANCELLED);
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -103,7 +104,15 @@ public String description() {

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
listener.onFailure(state(), e, sendShardFailure);
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
// update the stage.
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
if (cancelledException != null) {
state.setStage(SegmentReplicationState.Stage.CANCELLED);
listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure);
} else {
listener.onFailure(state(), e, sendShardFailure);
}
}

@Override
Expand Down Expand Up @@ -134,13 +143,22 @@ public void writeFileChunk(
* @param listener {@link ActionListener} listener.
*/
public void startReplication(ActionListener<Void> listener) {
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
// This method only executes when cancellation is triggered by this node and caught by a call to checkForCancel,
// SegmentReplicationSource does not share CancellableThreads.
final CancellableThreads.ExecutionCancelledException executionCancelledException =
new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
notifyListener(executionCancelledException, false);
throw executionCancelledException;
});
state.setStage(SegmentReplicationState.Stage.REPLICATING);
final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
// Get list of files to copy from this checkpoint.
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

Expand All @@ -154,6 +172,7 @@ public void startReplication(ActionListener<Void> listener) {

private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSegmentFilesResponse> getFilesListener)
throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
Expand Down Expand Up @@ -188,12 +207,14 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
}
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
ActionListener.completeWith(listener, () -> {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
Expand Down Expand Up @@ -261,4 +282,10 @@ Store.MetadataSnapshot getMetadataSnapshot() throws IOException {
}
return store.getMetadata(indexShard.getSegmentInfosSnapshot().get());
}

@Override
protected void onCancel(String reason) {
cancellableThreads.cancel(reason);
source.cancel();
}
}
Loading

0 comments on commit 1edb733

Please sign in to comment.