Skip to content

Commit

Permalink
Fix recovery states to move to completed during intiial shard recover…
Browse files Browse the repository at this point in the history
…y and mark the shard as active.

With this change IndexShard.startRecovery will only set up a replica shard and mark
it as tracked with the primary.  We will then only start replication after the
primary has refreshed after performing the first operation.
This also avoids a condition when the initial recovery is trying to replicate
from a primary shard that has not performed any operations and waits indefinately for
a replica to catch up to the latest sequence number.  This change also ensures that
we are only ever performing one replication event at any given moment.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 18, 2022
1 parent a48d1f3 commit b1a0510
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 97 deletions.
73 changes: 47 additions & 26 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.TrackShardResponse;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -3081,8 +3082,19 @@ public void startRecovery(
case PEER:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
markAsReplicating();
segmentReplicationService.startReplication(new ReplicationCheckpoint(this.shardId, -10, -10, 0), this, replicationSource, replicationListener);
segmentReplicationService.prepareForReplication(this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Segrep recovery complete.");
}
@Override
public void onFailure(Exception e) {
logger.error("fail", e);
}
});
} catch (Exception e) {
logger.error("Error", e);
failShard("corrupted preexisting index", e);
Expand Down Expand Up @@ -3692,32 +3704,37 @@ public void sync() throws IOException {
/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
*/
public void onNewCheckpoint(final PublishCheckpointRequest request,
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request,
final PrimaryShardReplicationSource source,
final SegmentReplicationService segmentReplicationService) {
if (this.replicationState.getStage() == ReplicationState.Stage.INACTIVE) {
try {
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received new checkpoint {}", checkpoint);
// TODO: segrep - these are the states set after we perform our initial store recovery.
if (RecoveryState.Stage.TRANSLOG != this.recoveryState.getStage() && this.state.equals(IndexShardState.RECOVERING) == false) {
logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state);
return;
if (state.equals(IndexShardState.STARTED) == false) {
logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state);
return;
}
if (isReplicating()) {
logger.info("Ignore - shard is currently replicating to a checkpoint");
return;
}
try {
markAsReplicating();
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received new checkpoint {}", checkpoint);
// TODO: segrep - these are the states set after we perform our initial store recovery.
segmentReplicationService.startReplication(checkpoint, this, source, new SegmentReplicationService.ReplicationListener() {
@Override
public void onReplicationDone(ReplicationState state) {
finalizeReplication();
logger.info("Replication complete.");
}
segmentReplicationService.startReplication(checkpoint, this, source, new SegmentReplicationService.ReplicationListener() {
@Override
public void onReplicationDone(ReplicationState state) {
logger.trace("Replication complete.");
}

@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error("Failure", e);
}
});
} catch (Exception e) {
logger.error("Error", e);
}
@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
finalizeReplication();
logger.error("Failure", e);
}
});
} catch (Exception e) {
logger.error("Error", e);
}
}

Expand All @@ -3726,11 +3743,15 @@ public ReplicationState getReplicationState() {
}

public void markAsReplicating() {
replicationState.setStage(ReplicationState.Stage.ACTIVE);
this.replicationState.setStage(ReplicationState.Stage.ACTIVE);
}

public void finalizeReplication() {
replicationState.setStage(ReplicationState.Stage.INACTIVE);
this.replicationState.setStage(ReplicationState.Stage.INACTIVE);
}

private boolean isReplicating() {
return this.replicationState.getStage() == ReplicationState.Stage.ACTIVE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
indicesService.createShard(
shardRouting,
replicationReplicaService,
new ReplicationListener(),
new ReplicationListener(shardRouting, primaryTerm),
replicationSource,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
Expand Down Expand Up @@ -761,16 +761,31 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(

private class ReplicationListener implements SegmentReplicationService.ReplicationListener {

private ReplicationListener() {}
/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting;

/**
* Primary term with which the shard was created
*/
private final long primaryTerm;

private ReplicationListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}

@Override
public void onReplicationDone(final ReplicationState state) {
logger.info("Replication Done");
logger.info("Shard setup complete, ready for segment copy.");
shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER);
}

@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error("Replication failed", e);
handleRecoveryFailure(shardRouting, sendShardFailure, e);
logger.error("Shard setup failed", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardRecoveryException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.Timer;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
Expand All @@ -29,7 +38,16 @@
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.ReplicationTarget;
import org.opensearch.indices.replication.copy.SegmentReplicationPrimaryService;
import org.opensearch.indices.replication.copy.TrackShardRequest;
import org.opensearch.indices.replication.copy.TrackShardResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Orchestrator of replication events.
Expand All @@ -40,6 +58,7 @@ public class SegmentReplicationService implements IndexEventListener {

private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;
private final TransportService transportService;

public ReplicationCollection getOnGoingReplications() {
return onGoingReplications;
Expand All @@ -48,9 +67,11 @@ public ReplicationCollection getOnGoingReplications() {
private final ReplicationCollection onGoingReplications;

public SegmentReplicationService(final ThreadPool threadPool,
final RecoverySettings recoverySettings) {
final RecoverySettings recoverySettings,
final TransportService transportService) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
this.transportService = transportService;
this.onGoingReplications = new ReplicationCollection(logger, threadPool);
}

Expand All @@ -61,8 +82,36 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

public void prepareForReplication(IndexShard indexShard, DiscoveryNode targetNode, DiscoveryNode sourceNode, ActionListener<TrackShardResponse> listener) {
setupReplicaShard(indexShard);
transportService.sendRequest(sourceNode,
SegmentReplicationPrimaryService.Actions.TRACK_SHARD,
new TrackShardRequest(indexShard.shardId(), indexShard.routingEntry().allocationId().getId(), targetNode),
new ActionListenerResponseHandler<>(listener, TrackShardResponse::new));
}

private void setupReplicaShard(IndexShard indexShard) throws IndexShardRecoveryException {
indexShard.prepareForIndexRecovery();
final Store store = indexShard.store();
store.incRef();
try {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, indexShard.shardId(),
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
indexShard.persistRetentionLeases();
indexShard.openEngineAndRecoverFromTranslog();
} catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to start replica shard", e);
} finally {
store.decRef();
}
}

public void startReplication(final ReplicationCheckpoint checkpoint, final IndexShard indexShard, PrimaryShardReplicationSource source, final ReplicationListener listener) {
final long replicationId = onGoingReplications.startReplication(checkpoint, indexShard, source, listener, recoverySettings.activityTimeout());
logger.info("Starting replication {}", replicationId);
threadPool.generic().execute(new ReplicationRunner(replicationId));
}

Expand Down Expand Up @@ -144,11 +193,11 @@ public void onResponse(ReplicationResponse replicationResponse) {
// final TimeValue replicationTime = new TimeValue(timer.time());
logger.trace("Replication complete {}", replicationId);
onGoingReplications.markReplicationAsDone(replicationId);
shard.finalizeReplication();
}

@Override
public void onFailure(Exception e) {
logger.error("Error", e);
if (logger.isTraceEnabled()) {
logger.trace(
() -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,21 @@ public void sendFiles(CopyState copyState, ActionListener<GetFilesResponse> list
throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
}

// TODO: This only needs to happen on the initial setup.
sendFileStep.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.initiateTracking(targetAllocationId),
shardId + " initiating tracking of " + targetAllocationId, shard, cancellableThreads, logger);

runUnderPrimaryPermit(
() -> shard.updateLocalCheckpointForShard(targetAllocationId, request.getCheckpoint().getSeqNo()),
shardId + " updating local checkpoint for " + targetAllocationId,
shard,
cancellableThreads,
logger
);
runUnderPrimaryPermit(
() -> shard.markAllocationIdAsInSync(targetAllocationId, request.getCheckpoint().getSeqNo()),
shardId + " marking " + targetAllocationId + " as in sync",
shard,
cancellableThreads,
logger
);
try {
future.onResponse(new GetFilesResponse());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,9 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardRecoveryException;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.MultiFileWriter;
import org.opensearch.indices.recovery.RecoveryRequestTracker;
import org.opensearch.indices.recovery.RecoveryState;
Expand Down Expand Up @@ -124,48 +120,18 @@ public ReplicationTarget(ReplicationCheckpoint checkpoint, IndexShard indexShard
}

public void startReplication(ActionListener<ReplicationResponse> listener) {
final StepListener<Boolean> shardStartedListener = new StepListener<>();
final StepListener<TransportCheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
final StepListener<GetFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

ensureShardStarted(shardStartedListener);

// Get list of files to copy from this checkpoint.
shardStartedListener.whenComplete(r -> source.getCheckpointInfo(replicationId, checkpoint, checkpointInfoListener), listener::onFailure);
source.getCheckpointInfo(replicationId, checkpoint, checkpointInfoListener);

checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure);
getFilesListener.whenComplete(response -> finalizeReplication(checkpointInfoListener.result(), finalizeListener), listener::onFailure);
finalizeListener.whenComplete(r -> listener.onResponse(new ReplicationResponse()), listener::onFailure);
}

private void ensureShardStarted(StepListener<Boolean> shardStartedListener) {
if (indexShard.recoveryState().getStage() == RecoveryState.Stage.INIT) {
setupReplicaShard(indexShard, shardStartedListener);
} else {
shardStartedListener.onResponse(true);
}
}

private void setupReplicaShard(IndexShard indexShard, StepListener<Boolean> shardStartedListener) throws IndexShardRecoveryException {
indexShard.prepareForIndexRecovery();
final Store store = indexShard.store();
store.incRef();
try {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, indexShard.shardId(),
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
indexShard.persistRetentionLeases();
indexShard.openEngineAndRecoverFromTranslog();
shardStartedListener.onResponse(true);
} catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to start replica shard", e);
} finally {
store.decRef();
}
}

public Store store() {
ensureRefCount();
Expand Down
Loading

0 comments on commit b1a0510

Please sign in to comment.