From 233acbcd4044ba9695b0816419a8df766e8e162f Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Wed, 18 May 2022 15:17:42 -0700 Subject: [PATCH] rename to replication Signed-off-by: Poojita Raj --- .../cluster/IndicesClusterStateService.java | 4 +- .../recovery/PeerRecoveryTargetService.java | 34 ++++++++-------- .../indices/recovery/RecoveryListener.java | 10 ++--- .../indices/recovery/RecoveryState.java | 4 +- .../indices/recovery/RecoveryTarget.java | 12 +++--- ...ection.java => ReplicationCollection.java} | 40 +++++++++---------- ...Listener.java => ReplicationListener.java} | 8 ++-- .../common/ReplicationRequestTracker.java | 2 +- ...TargetState.java => ReplicationState.java} | 2 +- ...hardTarget.java => ReplicationTarget.java} | 14 +++---- .../RecoveryDuringReplicationTests.java | 4 +- .../indices/recovery/RecoveryTests.java | 10 ++--- .../recovery/RecoveriesCollectionTests.java | 40 +++++++++---------- .../index/shard/IndexShardTestCase.java | 10 ++--- 14 files changed, 97 insertions(+), 97 deletions(-) rename server/src/main/java/org/opensearch/indices/replication/common/{ShardTargetCollection.java => ReplicationCollection.java} (86%) rename server/src/main/java/org/opensearch/indices/replication/common/{ShardTargetListener.java => ReplicationListener.java} (61%) rename server/src/main/java/org/opensearch/indices/replication/common/{ShardTargetState.java => ReplicationState.java} (91%) rename server/src/main/java/org/opensearch/indices/replication/common/{ShardTarget.java => ReplicationTarget.java} (91%) diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 29f9511db6528..d1623df156593 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -80,7 +80,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.common.ShardTargetState; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; import org.opensearch.snapshots.SnapshotShardsService; @@ -745,7 +745,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state()); } - public void handleRecoveryDone(ShardTargetState state, ShardRouting shardRouting, long primaryTerm) { + public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState RecState = (RecoveryState) state; shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 34c0c7204884a..37b16212a35ee 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -69,8 +69,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogCorruptedException; -import org.opensearch.indices.replication.common.ShardTargetCollection; -import org.opensearch.indices.replication.common.ShardTargetCollection.ShardTargetRef; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; @@ -125,7 +125,7 @@ public static class Actions { private final RecoverySettings recoverySettings; private final ClusterService clusterService; - private final ShardTargetCollection onGoingRecoveries; + private final ReplicationCollection onGoingRecoveries; public PeerRecoveryTargetService( ThreadPool threadPool, @@ -137,7 +137,7 @@ public PeerRecoveryTargetService( this.transportService = transportService; this.recoverySettings = recoverySettings; this.clusterService = clusterService; - this.onGoingRecoveries = new ShardTargetCollection<>(logger, threadPool); + this.onGoingRecoveries = new ReplicationCollection<>(logger, threadPool); transportService.registerRequestHandler( Actions.FILES_INFO, @@ -229,7 +229,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi final TransportRequest requestToSend; final StartRecoveryRequest startRequest; final ReplicationTimer timer; - try (ShardTargetCollection.ShardTargetRef recoveryRef = onGoingRecoveries.get(recoveryId)) { + try (ReplicationRef recoveryRef = onGoingRecoveries.get(recoveryId)) { if (recoveryRef == null) { logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); return; @@ -353,7 +353,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { - try (ShardTargetRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request); if (listener == null) { return; @@ -368,7 +368,7 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request); if (listener == null) { return; @@ -384,7 +384,7 @@ class HandoffPrimaryContextRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { recoveryRef.get().handoffPrimaryContext(request.primaryContext()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -397,7 +397,7 @@ class TranslogOperationsRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = createOrFinishListener( recoveryRef, @@ -417,7 +417,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin private void performTranslogOps( final RecoveryTranslogOperationsRequest request, final ActionListener listener, - final ShardTargetRef recoveryRef + final ReplicationRef recoveryRef ) { final RecoveryTarget recoveryTarget = recoveryRef.get(); @@ -433,7 +433,7 @@ private void performTranslogOps( public void onNewClusterState(ClusterState state) { threadPool.generic().execute(ActionRunnable.wrap(listener, l -> { try ( - ShardTargetRef recoveryRef = onGoingRecoveries.getSafe( + ReplicationRef recoveryRef = onGoingRecoveries.getSafe( request.recoveryId(), request.shardId() ) @@ -483,7 +483,7 @@ class FilesInfoRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request); if (listener == null) { return; @@ -506,7 +506,7 @@ class CleanFilesRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request); if (listener == null) { return; @@ -525,7 +525,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request); if (listener == null) { @@ -561,7 +561,7 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha } private ActionListener createOrFinishListener( - final ShardTargetRef recoveryRef, + final ReplicationRef recoveryRef, final TransportChannel channel, final String action, final RecoveryTransportRequest request @@ -570,7 +570,7 @@ private ActionListener createOrFinishListener( } private ActionListener createOrFinishListener( - final ShardTargetRef recoveryRef, + final ReplicationRef recoveryRef, final TransportChannel channel, final String action, final RecoveryTransportRequest request, @@ -607,7 +607,7 @@ class RecoveryRunner extends AbstractRunnable { @Override public void onFailure(Exception e) { - try (ShardTargetRef recoveryRef = onGoingRecoveries.get(recoveryId)) { + try (ReplicationRef recoveryRef = onGoingRecoveries.get(recoveryId)) { if (recoveryRef != null) { logger.error(() -> new ParameterizedMessage("unexpected error during recovery [{}], failing shard", recoveryId), e); onGoingRecoveries.fail( diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java index 944fa35ad8210..b93c054ffa4bf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java @@ -11,15 +11,15 @@ import org.opensearch.OpenSearchException; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.indices.cluster.IndicesClusterStateService; -import org.opensearch.indices.replication.common.ShardTargetListener; -import org.opensearch.indices.replication.common.ShardTargetState; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; /** * Listener that runs on changes in Recovery state * * @opensearch.internal */ -public class RecoveryListener implements ShardTargetListener { +public class RecoveryListener implements ReplicationListener { /** * ShardRouting with which the shard was created @@ -44,12 +44,12 @@ public RecoveryListener( } @Override - public void onDone(ShardTargetState state) { + public void onDone(ReplicationState state) { indicesClusterStateService.handleRecoveryDone(state, shardRouting, primaryTerm); } @Override - public void onFailure(ShardTargetState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index 293a77e87f2f0..a3c7adb755145 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -45,7 +45,7 @@ import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.replication.common.ShardTargetState; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; @@ -57,7 +57,7 @@ * * @opensearch.internal */ -public class RecoveryState implements ShardTargetState, ToXContentFragment, Writeable { +public class RecoveryState implements ReplicationState, ToXContentFragment, Writeable { /** * The stage of the recovery state diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 6459155e72cac..052ae035db30a 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -56,9 +56,9 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; -import org.opensearch.indices.replication.common.ShardTarget; -import org.opensearch.indices.replication.common.ShardTargetListener; -import org.opensearch.indices.replication.common.ShardTargetCollection; +import org.opensearch.indices.replication.common.ReplicationTarget; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationCollection; import java.io.IOException; import java.nio.file.Path; @@ -67,11 +67,11 @@ /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of - * this class are created through {@link ShardTargetCollection}. + * this class are created through {@link ReplicationCollection}. * * @opensearch.internal */ -public class RecoveryTarget extends ShardTarget implements RecoveryTargetHandler { +public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetHandler { private static final String RECOVERY_PREFIX = "recovery."; @@ -90,7 +90,7 @@ public class RecoveryTarget extends ShardTarget implements RecoveryTargetHandler * @param sourceNode source node of the recovery where we recover from * @param listener called when recovery is completed/failed */ - public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ShardTargetListener listener) { + public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) { super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); this.cancellableThreads = new CancellableThreads(); this.sourceNode = sourceNode; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ShardTargetCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java similarity index 86% rename from server/src/main/java/org/opensearch/indices/replication/common/ShardTargetCollection.java rename to server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index 079a60526ea74..e0ac90ef12ba2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ShardTargetCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -51,14 +51,14 @@ import java.util.concurrent.ConcurrentMap; /** - * This class holds a collection of all on going events on the current node (i.e., the node is the target node - * of those events). The class is used to guarantee concurrent semantics such that once a recoveries was done/cancelled/failed - * no other thread will be able to find it. Last, the {@link ShardTargetRef} inner class verifies that recovery temporary files + * This class holds a collection of all on going replication events on the current node (i.e., the node is the target node + * of those events). The class is used to guarantee concurrent semantics such that once an event was done/cancelled/failed + * no other thread will be able to find it. Last, the {@link ReplicationRef} inner class verifies that temporary files * and store will only be cleared once on going usage is finished. * * @opensearch.internal */ -public class ShardTargetCollection { +public class ReplicationCollection { /** This is the single source of truth for ongoing target events. If it's not here, it was canceled or done */ private final ConcurrentMap onGoingTargetEvents = ConcurrentCollections.newConcurrentMap(); @@ -66,7 +66,7 @@ public class ShardTargetCollection { private final Logger logger; private final ThreadPool threadPool; - public ShardTargetCollection(Logger logger, ThreadPool threadPool) { + public ReplicationCollection(Logger logger, ThreadPool threadPool) { this.logger = logger; this.threadPool = threadPool; } @@ -86,7 +86,7 @@ private void startInternal(T target, TimeValue activityTimeout) { assert existingTarget == null : "found two Target instances with the same id"; logger.trace("started {}", target.description()); threadPool.schedule( - new ShardTargetMonitor(target.getId(), target.lastAccessTime(), activityTimeout), + new ReplicationMonitor(target.getId(), target.lastAccessTime(), activityTimeout), activityTimeout, ThreadPool.Names.GENERIC ); @@ -143,23 +143,23 @@ public T getTarget(long id) { } /** - * gets the {@link ShardTarget } for a given id. The ShardTarget returned has it's ref count already incremented - * to make sure it's safe to use. However, you must call {@link ShardTarget#decRef()} when you are done with it, typically + * gets the {@link ReplicationTarget } for a given id. The ShardTarget returned has it's ref count already incremented + * to make sure it's safe to use. However, you must call {@link ReplicationTarget#decRef()} when you are done with it, typically * by using this method in a try-with-resources clause. *

* Returns null if recovery is not found */ - public ShardTargetRef get(long id) { + public ReplicationRef get(long id) { T status = onGoingTargetEvents.get(id); if (status != null && status.tryIncRef()) { - return new ShardTargetRef(status); + return new ReplicationRef(status); } return null; } /** Similar to {@link #get(long)} but throws an exception if no target is found */ - public ShardTargetRef getSafe(long id, ShardId shardId) { - ShardTargetRef ref = get(id); + public ReplicationRef getSafe(long id, ShardId shardId) { + ReplicationRef ref = get(id); if (ref == null) { throw new IndexShardClosedException(shardId); } @@ -236,31 +236,31 @@ public boolean cancelForShard(ShardId shardId, String reason) { } /** - * a reference to {@link ShardTarget}, which implements {@link AutoCloseable}. closing the reference - * causes {@link ShardTarget#decRef()} to be called. This makes sure that the underlying resources - * will not be freed until {@link ShardTargetRef#close()} is called. + * a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference + * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources + * will not be freed until {@link ReplicationRef#close()} is called. * * @opensearch.internal */ - public static class ShardTargetRef extends AutoCloseableRefCounted { + public static class ReplicationRef extends AutoCloseableRefCounted { /** - * Important: {@link ShardTarget#tryIncRef()} should + * Important: {@link ReplicationTarget#tryIncRef()} should * be *successfully* called on status before */ - public ShardTargetRef(T status) { + public ReplicationRef(T status) { super(status); status.setLastAccessTime(); } } - private class ShardTargetMonitor extends AbstractRunnable { + private class ReplicationMonitor extends AbstractRunnable { private final long id; private final TimeValue checkInterval; private volatile long lastSeenAccessTime; - private ShardTargetMonitor(long id, long lastSeenAccessTime, TimeValue checkInterval) { + private ReplicationMonitor(long id, long lastSeenAccessTime, TimeValue checkInterval) { this.id = id; this.checkInterval = checkInterval; this.lastSeenAccessTime = lastSeenAccessTime; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ShardTargetListener.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java similarity index 61% rename from server/src/main/java/org/opensearch/indices/replication/common/ShardTargetListener.java rename to server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java index 7517057a16f88..0666f475d496a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ShardTargetListener.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java @@ -11,13 +11,13 @@ import org.opensearch.OpenSearchException; /** - * Interface for listeners that run when there's a change in replication state + * Interface for listeners that run when there's a change in {@link ReplicationState} * * @opensearch.internal */ -public interface ShardTargetListener { +public interface ReplicationListener { - void onDone(ShardTargetState state); + void onDone(ReplicationState state); - void onFailure(ShardTargetState state, OpenSearchException e, boolean sendShardFailure); + void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationRequestTracker.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationRequestTracker.java index 7ac31b5a49ea2..0b0d20fc9f17e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationRequestTracker.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationRequestTracker.java @@ -45,7 +45,7 @@ import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; /** - * Tracks replication/recovery requests + * Tracks replication requests * * @opensearch.internal */ diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ShardTargetState.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java similarity index 91% rename from server/src/main/java/org/opensearch/indices/replication/common/ShardTargetState.java rename to server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java index d66ec99e0a6db..7942fa8938dd0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ShardTargetState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java @@ -13,6 +13,6 @@ * * @opensearch.internal */ -public interface ShardTargetState { +public interface ReplicationState { } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ShardTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java similarity index 91% rename from server/src/main/java/org/opensearch/indices/replication/common/ShardTarget.java rename to server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 952cbd2f1d683..a8c97a731ac75 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ShardTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -23,11 +23,11 @@ import java.util.concurrent.atomic.AtomicLong; /** - * Represents the target of an operation performed on a shard + * Represents the target of a replication operation performed on a shard * * @opensearch.internal */ -public abstract class ShardTarget extends AbstractRefCounted { +public abstract class ReplicationTarget extends AbstractRefCounted { private static final AtomicLong ID_GENERATOR = new AtomicLong(); @@ -38,7 +38,7 @@ public abstract class ShardTarget extends AbstractRefCounted { protected final AtomicBoolean finished = new AtomicBoolean(); protected final IndexShard indexShard; - protected final ShardTargetListener listener; + protected final ReplicationListener listener; protected final Logger logger; protected final CancellableThreads cancellableThreads; protected final ReplicationLuceneIndex recoveryStateIndex; @@ -49,13 +49,13 @@ public abstract class ShardTarget extends AbstractRefCounted { protected abstract void onCancel(String reason); - public abstract ShardTargetState state(); + public abstract ReplicationState state(); - public abstract ShardTarget retryCopy(); + public abstract ReplicationTarget retryCopy(); public abstract String description(); - public ShardTargetListener getListener() { + public ReplicationListener getListener() { return listener; } @@ -65,7 +65,7 @@ public CancellableThreads cancellableThreads() { public abstract void notifyListener(Exception e, boolean sendShardFailure); - public ShardTarget(String name, IndexShard indexShard, ReplicationLuceneIndex recoveryStateIndex, ShardTargetListener listener) { + public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex recoveryStateIndex, ReplicationListener listener) { super(name); this.logger = Loggers.getLogger(getClass(), indexShard.shardId()); this.listener = listener; diff --git a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java index dcf54a0ecbfcf..509d1f52daa0d 100644 --- a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java @@ -71,7 +71,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.common.ShardTargetListener; +import org.opensearch.indices.replication.common.ReplicationListener; import java.io.IOException; import java.util.ArrayList; @@ -809,7 +809,7 @@ public BlockingTarget( CountDownLatch releaseRecovery, IndexShard shard, DiscoveryNode sourceNode, - ShardTargetListener listener, + ReplicationListener listener, Logger logger ) { super(shard, sourceNode, listener); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 6428a775a92de..5224a54a35e96 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -69,8 +69,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; -import org.opensearch.indices.replication.common.ShardTargetListener; -import org.opensearch.indices.replication.common.ShardTargetState; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import java.io.IOException; import java.util.HashMap; @@ -451,14 +451,14 @@ public long addDocument(Iterable doc) throws IOExcepti IndexShard replica = group.addReplica(); expectThrows( Exception.class, - () -> group.recoverReplica(replica, (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new ShardTargetListener() { + () -> group.recoverReplica(replica, (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new ReplicationListener() { @Override - public void onDone(ShardTargetState state) { + public void onDone(ReplicationState state) { throw new AssertionError("recovery must fail"); } @Override - public void onFailure(ShardTargetState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated")); } })) diff --git a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java index e77b0753d5497..e341dfd086ea4 100644 --- a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java @@ -38,9 +38,9 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; -import org.opensearch.indices.replication.common.ShardTargetCollection; -import org.opensearch.indices.replication.common.ShardTargetListener; -import org.opensearch.indices.replication.common.ShardTargetState; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; @@ -52,26 +52,26 @@ import static org.hamcrest.Matchers.lessThan; public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTestCase { - static final ShardTargetListener listener = new ShardTargetListener() { + static final ReplicationListener listener = new ReplicationListener() { @Override - public void onDone(ShardTargetState state) { + public void onDone(ReplicationState state) { } @Override - public void onFailure(ShardTargetState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { } }; public void testLastAccessTimeUpdate() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final ShardTargetCollection collection = new ShardTargetCollection<>(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); - try (ShardTargetCollection.ShardTargetRef status = collection.get(recoveryId)) { + try (ReplicationCollection.ReplicationRef status = collection.get(recoveryId)) { final long lastSeenTime = status.get().lastAccessTime(); assertBusy(() -> { - try (ShardTargetCollection.ShardTargetRef currentStatus = collection.get(recoveryId)) { + try (ReplicationCollection.ReplicationRef currentStatus = collection.get(recoveryId)) { assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.get().lastAccessTime())); } }); @@ -83,17 +83,17 @@ public void testLastAccessTimeUpdate() throws Exception { public void testRecoveryTimeout() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final ShardTargetCollection collection = new ShardTargetCollection<>(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); final AtomicBoolean failed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); - final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), new ShardTargetListener() { + final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), new ReplicationListener() { @Override - public void onDone(ShardTargetState state) { + public void onDone(ReplicationState state) { latch.countDown(); } @Override - public void onFailure(ShardTargetState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { failed.set(true); latch.countDown(); } @@ -110,10 +110,10 @@ public void onFailure(ShardTargetState state, OpenSearchException e, boolean sen public void testRecoveryCancellation() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final ShardTargetCollection collection = new ShardTargetCollection<>(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); - try (ShardTargetCollection.ShardTargetRef recoveryRef = collection.get(recoveryId)) { + try (ReplicationCollection.ReplicationRef recoveryRef = collection.get(recoveryId)) { ShardId shardId = recoveryRef.get().indexShard().shardId(); assertTrue("failed to cancel recoveries", collection.cancelForShard(shardId, "test")); assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); @@ -129,7 +129,7 @@ public void testResetRecovery() throws Exception { shards.startAll(); int numDocs = randomIntBetween(1, 15); shards.indexDocs(numDocs); - final ShardTargetCollection collection = new ShardTargetCollection<>(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); IndexShard shard = shards.addReplica(); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard); RecoveryTarget recoveryTarget = collection.getTarget(recoveryId); @@ -152,7 +152,7 @@ public void testResetRecovery() throws Exception { String resetTempFileName = resetRecovery.getTempNameForFile("foobar"); assertNotEquals(tempFileName, resetTempFileName); assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget()); - try (ShardTargetCollection.ShardTargetRef newRecoveryRef = collection.get(resetRecoveryId)) { + try (ReplicationCollection.ReplicationRef newRecoveryRef = collection.get(resetRecoveryId)) { shards.recoverReplica(shard, (s, n) -> { assertSame(s, newRecoveryRef.get().indexShard()); return newRecoveryRef.get(); @@ -163,15 +163,15 @@ public void testResetRecovery() throws Exception { } } - long startRecovery(ShardTargetCollection collection, DiscoveryNode sourceNode, IndexShard shard) { + long startRecovery(ReplicationCollection collection, DiscoveryNode sourceNode, IndexShard shard) { return startRecovery(collection, sourceNode, shard, listener, TimeValue.timeValueMinutes(60)); } long startRecovery( - ShardTargetCollection collection, + ReplicationCollection collection, DiscoveryNode sourceNode, IndexShard indexShard, - ShardTargetListener listener, + ReplicationListener listener, TimeValue timeValue ) { final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 36c73963b9c26..298fdcaea6465 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -94,8 +94,8 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; -import org.opensearch.indices.replication.common.ShardTargetListener; -import org.opensearch.indices.replication.common.ShardTargetState; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; @@ -141,14 +141,14 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase { } }; - protected static final ShardTargetListener recoveryListener = new ShardTargetListener() { + protected static final ReplicationListener recoveryListener = new ReplicationListener() { @Override - public void onDone(ShardTargetState state) { + public void onDone(ReplicationState state) { } @Override - public void onFailure(ShardTargetState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { throw new AssertionError(e); } };