Skip to content

Commit

Permalink
Renaming the temporary RTarget class to ReplicationTarget
Browse files Browse the repository at this point in the history
The existing ReplicationTarget has been renamed to SegmentReplicationTarget. Alongside this change, unnecessary wrapper methods have been removed and replaced with more direct invocations.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Mar 12, 2022
1 parent 5cdd077 commit 97d02fd
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

Expand Down Expand Up @@ -203,7 +204,7 @@ protected void retryRecovery(final long recoveryId, final String reason, TimeVal
private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) {
RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
if (newTarget != null) {
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId()));
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.getId()));
}
}

Expand Down Expand Up @@ -232,7 +233,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final IndexShard indexShard = recoveryTarget.indexShard();
indexShard.preRecovery();
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
logger.trace("{} preparing shard for peer recovery", recoveryTarget.indexShard().shardId());
indexShard.prepareForIndexRecovery();
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
Expand Down Expand Up @@ -292,15 +293,16 @@ public static StartRecoveryRequest getStartRecoveryRequest(
long startingSeqNo
) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
logger.trace("{} collecting local files for [{}]", recoveryTarget.indexShard().shardId(), recoveryTarget.sourceNode());

Store.MetadataSnapshot metadataSnapshot;
try {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
Path translogPath = recoveryTarget.indexShard().shardPath().resolveTranslog();
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logger.warn(
Expand Down Expand Up @@ -335,15 +337,15 @@ public static StartRecoveryRequest getStartRecoveryRequest(
}
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
}
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
logger.trace("{} local file count [{}]", recoveryTarget.indexShard().shardId(), metadataSnapshot.size());
request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.indexShard().shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
localNode,
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
recoveryTarget.getId(),
startingSeqNo
);
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,20 @@ public long startRecovery(
) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
return recoveryTarget.getId();
}

private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue activityTimeout) {
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.recoveryId(), recoveryTarget);
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.getId(), recoveryTarget);
assert existingTarget == null : "found two RecoveryStatus instances with the same id";
logger.trace(
"{} started recovery from {}, id [{}]",
recoveryTarget.shardId(),
recoveryTarget.indexShard().shardId(),
recoveryTarget.sourceNode(),
recoveryTarget.recoveryId()
recoveryTarget.getId()
);
threadPool.schedule(
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout),
new RecoveryMonitor(recoveryTarget.getId(), recoveryTarget.lastAccessTime(), activityTimeout),
activityTimeout,
ThreadPool.Names.GENERIC
);
Expand Down Expand Up @@ -134,21 +134,21 @@ public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activ
if (successfulReset) {
logger.trace(
"{} restarted recovery from {}, id [{}], previous id [{}]",
newRecoveryTarget.shardId(),
newRecoveryTarget.indexShard().shardId(),
newRecoveryTarget.sourceNode(),
newRecoveryTarget.recoveryId(),
oldRecoveryTarget.recoveryId()
newRecoveryTarget.getId(),
oldRecoveryTarget.getId()
);
return newRecoveryTarget;
} else {
logger.trace(
"{} recovery could not be reset as it is already cancelled, recovery from {}, id [{}], previous id [{}]",
newRecoveryTarget.shardId(),
newRecoveryTarget.indexShard().shardId(),
newRecoveryTarget.sourceNode(),
newRecoveryTarget.recoveryId(),
oldRecoveryTarget.recoveryId()
newRecoveryTarget.getId(),
oldRecoveryTarget.getId()
);
cancelRecovery(newRecoveryTarget.recoveryId(), "recovery cancelled during reset");
cancelRecovery(newRecoveryTarget.getId(), "recovery cancelled during reset");
return null;
}
} catch (Exception e) {
Expand Down Expand Up @@ -184,7 +184,7 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) {
throw new IndexShardClosedException(shardId);
}
RecoveryTarget recoveryTarget = recoveryRef.get(RecoveryTarget.class);
assert recoveryTarget.shardId().equals(shardId);
assert recoveryTarget.indexShard().shardId().equals(shardId);
return recoveryRef;
}

Expand All @@ -195,9 +195,9 @@ public boolean cancelRecovery(long id, String reason) {
if (removed != null) {
logger.trace(
"{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(),
removed.indexShard().shardId(),
removed.sourceNode(),
removed.recoveryId(),
removed.getId(),
reason
);
removed.cancel(reason);
Expand All @@ -218,9 +218,9 @@ public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFa
if (removed != null) {
logger.trace(
"{} failing recovery from {}, id [{}]. Send shard failure: [{}]",
removed.shardId(),
removed.indexShard().shardId(),
removed.sourceNode(),
removed.recoveryId(),
removed.getId(),
sendShardFailure
);
removed.fail(e, sendShardFailure);
Expand All @@ -231,7 +231,12 @@ public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFa
public void markRecoveryAsDone(long id) {
RecoveryTarget removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} marking recovery from {} as done, id [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId());
logger.trace(
"{} marking recovery from {} as done, id [{}]",
removed.indexShard().shardId(),
removed.sourceNode(),
removed.getId()
);
removed.markAsDone();
}
}
Expand All @@ -254,7 +259,7 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
synchronized (onGoingRecoveries) {
for (Iterator<RecoveryTarget> it = onGoingRecoveries.values().iterator(); it.hasNext();) {
RecoveryTarget status = it.next();
if (status.shardId().equals(shardId)) {
if (status.indexShard().shardId().equals(shardId)) {
matchedRecoveries.add(status);
it.remove();
}
Expand All @@ -263,9 +268,9 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
for (RecoveryTarget removed : matchedRecoveries) {
logger.trace(
"{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(),
removed.indexShard().shardId(),
removed.sourceNode(),
removed.recoveryId(),
removed.getId(),
reason
);
removed.cancel(reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,20 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardNotRecoveringException;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.RTarget;
import org.opensearch.indices.replication.common.ReplicationTarget;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
* this class are created through {@link RecoveriesCollection}.
*/
public class RecoveryTarget extends RTarget implements RecoveryTargetHandler {
public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetHandler {

private static final String RECOVERY_PREFIX = "recovery.";

Expand All @@ -90,6 +88,11 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecov
indexShard.recoveryStats().incCurrentAsTarget();
}

@Override
public String toString() {
return indexShard.shardId() + " [" + getId() + "]";
}

@Override
protected String getPrefix() {
return RECOVERY_PREFIX;
Expand Down Expand Up @@ -132,18 +135,14 @@ protected void closeInternal() {
}
}

/**
* Wrapper method around the internal {@link #listener} object
* to enforce stronger typing on the return type.
*/
public PeerRecoveryTargetService.RecoveryListener getListener() {
return (PeerRecoveryTargetService.RecoveryListener) listener;
}

public long recoveryId() {
return getId();
}

public ShardId shardId() {
return indexShard.shardId();
}

public DiscoveryNode sourceNode() {
return this.sourceNode;
}
Expand All @@ -160,7 +159,7 @@ boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOE
final long recoveryId = getId();
if (finished.compareAndSet(false, true)) {
try {
logger.debug("reset of recovery with shard {} and id [{}]", shardId(), recoveryId);
logger.debug("reset of recovery with shard {} and id [{}]", indexShard.shardId(), recoveryId);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now.
decRef();
Expand All @@ -170,7 +169,7 @@ boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOE
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace(
"new recovery target cancelled for shard {} while waiting on old recovery target with id [{}] to close",
shardId(),
indexShard.shardId(),
recoveryId
);
return false;
Expand All @@ -194,11 +193,6 @@ public void notifyListener(RecoveryFailedException e, boolean sendShardFailure)
listener.onFailure(state(), e, sendShardFailure);
}

@Override
public String toString() {
return shardId() + " [" + getId() + "]";
}

/*** Implementation of {@link RecoveryTargetHandler } */

@Override
Expand Down Expand Up @@ -266,7 +260,7 @@ public void indexTranslogOperations(
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId(), indexShard().state());
throw new IndexShardNotRecoveringException(indexShard.shardId(), indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
Expand Down Expand Up @@ -352,7 +346,7 @@ public void cleanFiles(
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
globalCheckpoint,
shardId(),
indexShard.shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
Expand Down Expand Up @@ -407,21 +401,13 @@ public void writeFileChunk(
) {
state().getTranslog().totalOperations(totalTranslogOps);
this.writeFileChunk(fileMetadata, position, content, lastChunk, listener);
try {
state().getTranslog().totalOperations(totalTranslogOps);
multiFileWriter.writeFileChunk(fileMetadata, position, content, lastChunk);
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}

/** Get a temporary name for the provided file name. */
/**
* Get a temporary name for the provided file name.
* This is only used in tests.
*/
public String getTempNameForFile(String origFile) {
return multiFileWriter.getTempNameForFile(origFile);
}

Path translogLocation() {
return indexShard().shardPath().resolveTranslog();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.indices.replication.copy.ReplicationCollection;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.ReplicationTarget;
import org.opensearch.indices.replication.copy.SegmentReplicationTarget;
import org.opensearch.indices.replication.copy.SegmentReplicationPrimaryService;
import org.opensearch.indices.replication.copy.TrackShardRequest;
import org.opensearch.indices.replication.copy.TrackShardResponse;
Expand Down Expand Up @@ -181,7 +181,7 @@ private void doReplication(final long replicationId) {
logger.trace("not running replication with id [{}] - can not find it (probably finished)", replicationId);
return;
}
final ReplicationTarget replicationTarget = replicationRef.get(ReplicationTarget.class);
final SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class);
timer = replicationTarget.state().getTimer();
final IndexShard indexShard = replicationTarget.indexShard();

Expand Down Expand Up @@ -218,7 +218,7 @@ public void onFailure(Exception e) {
() -> new ParameterizedMessage("unexpected error during replication [{}], failing shard", replicationId),
e
);
ReplicationTarget replicationTarget = replicationRef.get(ReplicationTarget.class);
SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class);
onGoingReplications.failReplication(
replicationId,
new ReplicationFailedException(replicationTarget.indexShard(), "unexpected error", e),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public abstract class RTarget extends AbstractRefCounted {
public abstract class ReplicationTarget extends AbstractRefCounted {

// TODO will this cause issues because its shared between subclasses?
private static final AtomicLong ID_GENERATOR = new AtomicLong();
Expand Down Expand Up @@ -53,7 +53,7 @@ public abstract class RTarget extends AbstractRefCounted {

public abstract RState state();

public RTarget(String name, IndexShard indexShard, RecoveryState.Index recoveryStateIndex, RListener listener) {
public ReplicationTarget(String name, IndexShard indexShard, RecoveryState.Index recoveryStateIndex, RListener listener) {
super(name);
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.listener = listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void messageReceived(final ReplicationFileChunkRequest request, Transport
ReplicationCollection.ReplicationRef replicationRef = segmentReplicationReplicaService.getOnGoingReplications()
.getReplicationSafe(request.getReplicationId(), request.shardId())
) {
final ReplicationTarget replicationTarget = replicationRef.get(ReplicationTarget.class);
final SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class);
final ActionListener<Void> listener = createOrFinishListener(replicationRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
return;
Expand Down Expand Up @@ -274,7 +274,7 @@ private ActionListener<Void> createOrFinishListener(
final ReplicationFileChunkRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn
) {
final ReplicationTarget replicationTarget = replicationRef.get(ReplicationTarget.class);
final SegmentReplicationTarget replicationTarget = replicationRef.get(SegmentReplicationTarget.class);
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);

Expand Down
Loading

0 comments on commit 97d02fd

Please sign in to comment.