Skip to content

Commit

Permalink
rename and doc changes
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed May 19, 2022
1 parent 233acbc commit 50c4e6e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ protected void retryRecovery(final long recoveryId, final String reason, TimeVal
}

private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) {
RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
RecoveryTarget newTarget = onGoingRecoveries.reset(recoveryId, activityTimeout);
if (newTarget != null) {
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.getId()));
}
Expand Down Expand Up @@ -241,7 +241,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final IndexShard indexShard = recoveryTarget.indexShard();
indexShard.preRecovery();
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.indexShard().shardId());
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
Expand Down Expand Up @@ -292,7 +292,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(
long startingSeqNo
) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.indexShard().shardId(), recoveryTarget.sourceNode());
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

Store.MetadataSnapshot metadataSnapshot;
try {
Expand Down Expand Up @@ -335,9 +335,9 @@ public static StartRecoveryRequest getStartRecoveryRequest(
}
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
}
logger.trace("{} local file count [{}]", recoveryTarget.indexShard().shardId(), metadataSnapshot.size());
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
request = new StartRecoveryRequest(
recoveryTarget.indexShard().shardId(),
recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
localNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati
indexShard.recoveryStats().incCurrentAsTarget();
this.store = indexShard.store();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), recoveryStateIndex, tempFilePrefix, logger, this::ensureRefCount);
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
store.incRef();
}

Expand Down Expand Up @@ -149,11 +149,11 @@ public void notifyListener(Exception e, boolean sendShardFailure) {
* Closes the current recovery target and waits up to a certain timeout for resources to be freed.
* Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done.
*/
public boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOException {
public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException {
final long recoveryId = getId();
if (finished.compareAndSet(false, true)) {
try {
logger.debug("reset of recovery with shard {} and id [{}]", indexShard.shardId(), recoveryId);
logger.debug("reset of recovery with shard {} and id [{}]", shardId(), recoveryId);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now.
decRef();
Expand All @@ -163,7 +163,7 @@ public boolean resetRecovery(CancellableThreads newTargetCancellableThreads) thr
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace(
"new recovery target cancelled for shard {} while waiting on old recovery target with id [{}] to close",
indexShard.shardId(),
shardId(),
recoveryId
);
return false;
Expand Down Expand Up @@ -241,7 +241,7 @@ protected void closeInternal() {

@Override
public String toString() {
return indexShard.shardId() + " [" + getId() + "]";
return shardId() + " [" + getId() + "]";
}

@Override
Expand Down Expand Up @@ -332,7 +332,7 @@ public void indexTranslogOperations(
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(indexShard.shardId(), indexShard().state());
throw new IndexShardNotRecoveringException(shardId(), indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
Expand Down Expand Up @@ -418,7 +418,7 @@ public void cleanFiles(
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
globalCheckpoint,
indexShard.shardId(),
shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public ReplicationCollection(Logger logger, ThreadPool threadPool) {
}

/**
* Starts are new recovery for the given shard, source node and state
* Starts a new target event for the given shard, source node and state
*
* @return the id of the new recovery.
* @return the id of the new target event.
*/
public long start(T target, TimeValue activityTimeout) {
startInternal(target, activityTimeout);
Expand All @@ -93,21 +93,21 @@ private void startInternal(T target, TimeValue activityTimeout) {
}

/**
* Resets the recovery and performs a recovery restart on the currently recovering index shard
* Resets the target event and performs a restart on the current index shard
*
* @see IndexShard#performRecoveryRestart()
* @return newly created RecoveryTarget
* @return newly created Target
*/
@SuppressWarnings(value = "unchecked")
public T resetRecovery(final long recoveryId, final TimeValue activityTimeout) {
public T reset(final long id, final TimeValue activityTimeout) {
T oldTarget = null;
final T newTarget;

try {
synchronized (onGoingTargetEvents) {
// swap recovery targets in a synchronized block to ensure that the newly added recovery target is picked up by
// cancelRecoveriesForShard whenever the old recovery target is picked up
oldTarget = onGoingTargetEvents.remove(recoveryId);
// swap targets in a synchronized block to ensure that the newly added target is picked up by
// cancelForShard whenever the old target is picked up
oldTarget = onGoingTargetEvents.remove(id);
if (oldTarget == null) {
return null;
}
Expand All @@ -116,8 +116,8 @@ public T resetRecovery(final long recoveryId, final TimeValue activityTimeout) {
startInternal(newTarget, activityTimeout);
}

// Closes the current recovery target
boolean successfulReset = oldTarget.resetRecovery(newTarget.cancellableThreads());
// Closes the current target
boolean successfulReset = oldTarget.reset(newTarget.cancellableThreads());
if (successfulReset) {
logger.trace("restarted {}, previous id [{}]", newTarget.description(), oldTarget.getId());
return newTarget;
Expand All @@ -127,7 +127,7 @@ public T resetRecovery(final long recoveryId, final TimeValue activityTimeout) {
newTarget.description(),
oldTarget.getId()
);
cancel(newTarget.getId(), "recovery cancelled during reset");
cancel(newTarget.getId(), "cancelled during reset");
return null;
}
} catch (Exception e) {
Expand All @@ -147,7 +147,7 @@ public T getTarget(long id) {
* to make sure it's safe to use. However, you must call {@link ReplicationTarget#decRef()} when you are done with it, typically
* by using this method in a try-with-resources clause.
* <p>
* Returns null if recovery is not found
* Returns null if target event is not found
*/
public ReplicationRef<T> get(long id) {
T status = onGoingTargetEvents.get(id);
Expand Down Expand Up @@ -203,7 +203,7 @@ public void markAsDone(long id) {
}
}

/** the number of ongoing recoveries */
/** the number of ongoing target events */
public int size() {
return onGoingTargetEvents.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.UUIDs;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -37,11 +37,12 @@ public abstract class ReplicationTarget extends AbstractRefCounted {
private final long id;

protected final AtomicBoolean finished = new AtomicBoolean();
private final ShardId shardId;
protected final IndexShard indexShard;
protected final ReplicationListener listener;
protected final Logger logger;
protected final CancellableThreads cancellableThreads;
protected final ReplicationLuceneIndex recoveryStateIndex;
protected final ReplicationLuceneIndex stateIndex;

protected abstract String getPrefix();

Expand All @@ -65,14 +66,14 @@ public CancellableThreads cancellableThreads() {

public abstract void notifyListener(Exception e, boolean sendShardFailure);

public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex recoveryStateIndex, ReplicationListener listener) {
public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) {
super(name);
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.listener = listener;
this.id = ID_GENERATOR.incrementAndGet();
this.recoveryStateIndex = recoveryStateIndex;
this.stateIndex = stateIndex;
this.indexShard = indexShard;
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.shardId = indexShard.shardId();
// make sure the store is not released until we are done.
this.cancellableThreads = new CancellableThreads();
}
Expand All @@ -81,7 +82,7 @@ public long getId() {
return id;
}

public abstract boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOException;
public abstract boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException;

/**
* return the last time this ReplicationStatus was used (based on System.nanoTime()
Expand All @@ -106,6 +107,10 @@ public IndexShard indexShard() {
return indexShard;
}

public ShardId shardId() {
return shardId;
}

/**
* mark the current replication as done
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTestCase {
public class ReplicationCollectionTests extends OpenSearchIndexLevelReplicationTestCase {
static final ReplicationListener listener = new ReplicationListener() {
@Override
public void onDone(ReplicationState state) {
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testResetRecovery() throws Exception {
IndexShard indexShard = recoveryTarget.indexShard();
Store store = recoveryTarget.store();
String tempFileName = recoveryTarget.getTempNameForFile("foobar");
RecoveryTarget resetRecovery = collection.resetRecovery(recoveryId, TimeValue.timeValueMinutes(60));
RecoveryTarget resetRecovery = collection.reset(recoveryId, TimeValue.timeValueMinutes(60));
final long resetRecoveryId = resetRecovery.getId();
assertNotSame(recoveryTarget, resetRecovery);
assertNotSame(recoveryTarget.cancellableThreads(), resetRecovery.cancellableThreads());
Expand Down

0 comments on commit 50c4e6e

Please sign in to comment.