diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 37b16212a35ee..e13022afa81ba 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -212,7 +212,7 @@ protected void retryRecovery(final long recoveryId, final String reason, TimeVal } private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) { - RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); + RecoveryTarget newTarget = onGoingRecoveries.reset(recoveryId, activityTimeout); if (newTarget != null) { threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.getId())); } @@ -241,7 +241,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi final IndexShard indexShard = recoveryTarget.indexShard(); indexShard.preRecovery(); assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; - logger.trace("{} preparing shard for peer recovery", recoveryTarget.indexShard().shardId()); + logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG @@ -292,7 +292,7 @@ public static StartRecoveryRequest getStartRecoveryRequest( long startingSeqNo ) { final StartRecoveryRequest request; - logger.trace("{} collecting local files for [{}]", recoveryTarget.indexShard().shardId(), recoveryTarget.sourceNode()); + logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); Store.MetadataSnapshot metadataSnapshot; try { @@ -335,9 +335,9 @@ public static StartRecoveryRequest getStartRecoveryRequest( } metadataSnapshot = Store.MetadataSnapshot.EMPTY; } - logger.trace("{} local file count [{}]", recoveryTarget.indexShard().shardId(), metadataSnapshot.size()); + logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); request = new StartRecoveryRequest( - recoveryTarget.indexShard().shardId(), + recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), localNode, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 052ae035db30a..92897ab19ad64 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -97,7 +97,7 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati indexShard.recoveryStats().incCurrentAsTarget(); this.store = indexShard.store(); final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; - this.multiFileWriter = new MultiFileWriter(indexShard.store(), recoveryStateIndex, tempFilePrefix, logger, this::ensureRefCount); + this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount); store.incRef(); } @@ -149,11 +149,11 @@ public void notifyListener(Exception e, boolean sendShardFailure) { * Closes the current recovery target and waits up to a certain timeout for resources to be freed. * Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done. */ - public boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOException { + public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException { final long recoveryId = getId(); if (finished.compareAndSet(false, true)) { try { - logger.debug("reset of recovery with shard {} and id [{}]", indexShard.shardId(), recoveryId); + logger.debug("reset of recovery with shard {} and id [{}]", shardId(), recoveryId); } finally { // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now. decRef(); @@ -163,7 +163,7 @@ public boolean resetRecovery(CancellableThreads newTargetCancellableThreads) thr } catch (CancellableThreads.ExecutionCancelledException e) { logger.trace( "new recovery target cancelled for shard {} while waiting on old recovery target with id [{}] to close", - indexShard.shardId(), + shardId(), recoveryId ); return false; @@ -241,7 +241,7 @@ protected void closeInternal() { @Override public String toString() { - return indexShard.shardId() + " [" + getId() + "]"; + return shardId() + " [" + getId() + "]"; } @Override @@ -332,7 +332,7 @@ public void indexTranslogOperations( translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); if (indexShard().state() != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(indexShard.shardId(), indexShard().state()); + throw new IndexShardNotRecoveringException(shardId(), indexShard().state()); } /* * The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation @@ -418,7 +418,7 @@ public void cleanFiles( final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), globalCheckpoint, - indexShard.shardId(), + shardId(), indexShard.getPendingPrimaryTerm() ); store.associateIndexWithNewTranslog(translogUUID); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index e0ac90ef12ba2..609825eb5227b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -72,9 +72,9 @@ public ReplicationCollection(Logger logger, ThreadPool threadPool) { } /** - * Starts are new recovery for the given shard, source node and state + * Starts a new target event for the given shard, source node and state * - * @return the id of the new recovery. + * @return the id of the new target event. */ public long start(T target, TimeValue activityTimeout) { startInternal(target, activityTimeout); @@ -93,21 +93,21 @@ private void startInternal(T target, TimeValue activityTimeout) { } /** - * Resets the recovery and performs a recovery restart on the currently recovering index shard + * Resets the target event and performs a restart on the current index shard * * @see IndexShard#performRecoveryRestart() - * @return newly created RecoveryTarget + * @return newly created Target */ @SuppressWarnings(value = "unchecked") - public T resetRecovery(final long recoveryId, final TimeValue activityTimeout) { + public T reset(final long id, final TimeValue activityTimeout) { T oldTarget = null; final T newTarget; try { synchronized (onGoingTargetEvents) { - // swap recovery targets in a synchronized block to ensure that the newly added recovery target is picked up by - // cancelRecoveriesForShard whenever the old recovery target is picked up - oldTarget = onGoingTargetEvents.remove(recoveryId); + // swap targets in a synchronized block to ensure that the newly added target is picked up by + // cancelForShard whenever the old target is picked up + oldTarget = onGoingTargetEvents.remove(id); if (oldTarget == null) { return null; } @@ -116,8 +116,8 @@ public T resetRecovery(final long recoveryId, final TimeValue activityTimeout) { startInternal(newTarget, activityTimeout); } - // Closes the current recovery target - boolean successfulReset = oldTarget.resetRecovery(newTarget.cancellableThreads()); + // Closes the current target + boolean successfulReset = oldTarget.reset(newTarget.cancellableThreads()); if (successfulReset) { logger.trace("restarted {}, previous id [{}]", newTarget.description(), oldTarget.getId()); return newTarget; @@ -127,7 +127,7 @@ public T resetRecovery(final long recoveryId, final TimeValue activityTimeout) { newTarget.description(), oldTarget.getId() ); - cancel(newTarget.getId(), "recovery cancelled during reset"); + cancel(newTarget.getId(), "cancelled during reset"); return null; } } catch (Exception e) { @@ -147,7 +147,7 @@ public T getTarget(long id) { * to make sure it's safe to use. However, you must call {@link ReplicationTarget#decRef()} when you are done with it, typically * by using this method in a try-with-resources clause. *

- * Returns null if recovery is not found + * Returns null if target event is not found */ public ReplicationRef get(long id) { T status = onGoingTargetEvents.get(id); @@ -203,7 +203,7 @@ public void markAsDone(long id) { } } - /** the number of ongoing recoveries */ + /** the number of ongoing target events */ public int size() { return onGoingTargetEvents.size(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index a8c97a731ac75..0192270907fd2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -12,11 +12,11 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.common.UUIDs; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,11 +37,12 @@ public abstract class ReplicationTarget extends AbstractRefCounted { private final long id; protected final AtomicBoolean finished = new AtomicBoolean(); + private final ShardId shardId; protected final IndexShard indexShard; protected final ReplicationListener listener; protected final Logger logger; protected final CancellableThreads cancellableThreads; - protected final ReplicationLuceneIndex recoveryStateIndex; + protected final ReplicationLuceneIndex stateIndex; protected abstract String getPrefix(); @@ -65,14 +66,14 @@ public CancellableThreads cancellableThreads() { public abstract void notifyListener(Exception e, boolean sendShardFailure); - public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex recoveryStateIndex, ReplicationListener listener) { + public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) { super(name); this.logger = Loggers.getLogger(getClass(), indexShard.shardId()); this.listener = listener; this.id = ID_GENERATOR.incrementAndGet(); - this.recoveryStateIndex = recoveryStateIndex; + this.stateIndex = stateIndex; this.indexShard = indexShard; - final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; + this.shardId = indexShard.shardId(); // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); } @@ -81,7 +82,7 @@ public long getId() { return id; } - public abstract boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOException; + public abstract boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException; /** * return the last time this ReplicationStatus was used (based on System.nanoTime() @@ -106,6 +107,10 @@ public IndexShard indexShard() { return indexShard; } + public ShardId shardId() { + return shardId; + } + /** * mark the current replication as done */ diff --git a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java similarity index 97% rename from server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java rename to server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java index e341dfd086ea4..7587f48503625 100644 --- a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java @@ -51,7 +51,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; -public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTestCase { +public class ReplicationCollectionTests extends OpenSearchIndexLevelReplicationTestCase { static final ReplicationListener listener = new ReplicationListener() { @Override public void onDone(ReplicationState state) { @@ -138,7 +138,7 @@ public void testResetRecovery() throws Exception { IndexShard indexShard = recoveryTarget.indexShard(); Store store = recoveryTarget.store(); String tempFileName = recoveryTarget.getTempNameForFile("foobar"); - RecoveryTarget resetRecovery = collection.resetRecovery(recoveryId, TimeValue.timeValueMinutes(60)); + RecoveryTarget resetRecovery = collection.reset(recoveryId, TimeValue.timeValueMinutes(60)); final long resetRecoveryId = resetRecovery.getId(); assertNotSame(recoveryTarget, resetRecovery); assertNotSame(recoveryTarget.cancellableThreads(), resetRecovery.cancellableThreads());