Skip to content

Commit

Permalink
Starting to merge common replication functionality into parent classes
Browse files Browse the repository at this point in the history
Parent class names are currently temporary placeholders. They will renamed in a separate commit to avoid a convoluted commit diff.

A large part of duplicate functionality in the two *Target classes has been moved into the parent class. The two *Listeners are also very similar and now implement a common interface. The usec-ase specific methods in the listeners will eventually be replaced by their generic equivalents. Finally, the State parent class is currently a placeholder and will soon hold more logic.

Some other parts of the code were also changed to make call patterns more streamlined.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Mar 11, 2022
1 parent cc0979c commit 76f09bb
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.opensearch.indices.replication.common.RListener;
import org.opensearch.indices.replication.common.RState;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -347,7 +349,18 @@ public static StartRecoveryRequest getStartRecoveryRequest(
return request;
}

public interface RecoveryListener {
public interface RecoveryListener extends RListener {

@Override
default void onDone(RState state) {
onRecoveryDone((RecoveryState) state);
}

@Override
default void onFailure(RState state, OpenSearchException e, boolean sendShardFailure) {
onRecoveryFailure((RecoveryState) state, (RecoveryFailedException) e, sendShardFailure);
}

void onRecoveryDone(RecoveryState state);

void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activ
return null;
}

newRecoveryTarget = oldRecoveryTarget.retryCopy();
// Copy the RecoveryTarget to retry recovery from the same source node onto the same shard and using the same listener.
newRecoveryTarget = new RecoveryTarget(
oldRecoveryTarget.indexShard(),
oldRecoveryTarget.sourceNode(),
oldRecoveryTarget.getListener()
);
startRecoveryInternal(newRecoveryTarget, activityTimeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.StoreStats;
import org.opensearch.indices.replication.common.RState;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -63,7 +64,7 @@
/**
* Keeps track of state related to shard recovery.
*/
public class RecoveryState implements ToXContentFragment, Writeable {
public class RecoveryState implements ToXContentFragment, Writeable, RState {

public enum Stage {
INIT((byte) 0),
Expand Down
209 changes: 53 additions & 156 deletions server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,18 @@

package org.opensearch.indices.recovery;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.seqno.ReplicationTracker;
Expand All @@ -60,42 +56,23 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.RTarget;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
* this class are created through {@link RecoveriesCollection}.
*/
public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler {

private final Logger logger;

private static final AtomicLong idGenerator = new AtomicLong();
public class RecoveryTarget extends RTarget implements RecoveryTargetHandler {

private static final String RECOVERY_PREFIX = "recovery.";

private final ShardId shardId;
private final long recoveryId;
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final MultiFileWriter multiFileWriter;
private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
private final Store store;
private final PeerRecoveryTargetService.RecoveryListener listener;

private final AtomicBoolean finished = new AtomicBoolean();

private final CancellableThreads cancellableThreads;

// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();

// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

Expand All @@ -107,89 +84,83 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener) {
super("recovery_status");
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(
indexShard.store(),
indexShard.recoveryState().getIndex(),
tempFilePrefix,
logger,
this::ensureRefCount
);
this.store = indexShard.store();
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
}

/**
* Returns a fresh recovery target to retry recovery from the same source node onto the same shard and using the same listener.
*
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, listener);
@Override
protected String getPrefix() {
return RECOVERY_PREFIX;
}

public ActionListener<Void> markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener<Void> listener) {
return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener);
@Override
protected void onDone() {
indexShard.postRecovery("peer recovery done");
}

public long recoveryId() {
return recoveryId;
@Override
public RecoveryState state() {
return indexShard.recoveryState();
}

public ShardId shardId() {
return shardId;
/**
* if {@link #cancellableThreads()} was used, the threads will be interrupted.
*/
@Override
protected void onCancel(String reason) {
cancellableThreads.cancel(reason);
}

public IndexShard indexShard() {
ensureRefCount();
return indexShard;
@Override
protected void onFail(OpenSearchException e, boolean sendShardFailure) {
cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]");
}

public DiscoveryNode sourceNode() {
return this.sourceNode;
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
super.fail(e, sendShardFailure);
}

public RecoveryState state() {
return indexShard.recoveryState();
@Override
protected void closeInternal() {
try {
super.closeInternal();
} finally {
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
}
}

public CancellableThreads cancellableThreads() {
return cancellableThreads;
public PeerRecoveryTargetService.RecoveryListener getListener() {
return (PeerRecoveryTargetService.RecoveryListener) listener;
}

/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
return lastAccessTime;
public long recoveryId() {
return getId();
}

/** sets the lasAccessTime flag to now */
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
public ShardId shardId() {
return indexShard.shardId();
}

public Store store() {
ensureRefCount();
return store;
public DiscoveryNode sourceNode() {
return this.sourceNode;
}

public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

/**
* Closes the current recovery target and waits up to a certain timeout for resources to be freed.
* Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done.
*/
boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOException {
final long recoveryId = getId();
if (finished.compareAndSet(false, true)) {
try {
logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId);
logger.debug("reset of recovery with shard {} and id [{}]", shardId(), recoveryId);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now.
decRef();
Expand All @@ -199,7 +170,7 @@ boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOE
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace(
"new recovery target cancelled for shard {} while waiting on old recovery target with id [{}] to close",
shardId,
shardId(),
recoveryId
);
return false;
Expand All @@ -219,89 +190,13 @@ boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOE
return false;
}

/**
* cancel the recovery. calling this method will clean temporary files and release the store
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
* <p>
* if {@link #cancellableThreads()} was used, the threads will be interrupted.
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {
try {
logger.debug("recovery canceled (reason: [{}])", reason);
cancellableThreads.cancel(reason);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}

/**
* fail the recovery and call listener
*
* @param e exception that encapsulating the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
*/
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
notifyListener(e, sendShardFailure);
} finally {
try {
cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
}

public void notifyListener(RecoveryFailedException e, boolean sendShardFailure) {
listener.onRecoveryFailure(state(), e, sendShardFailure);
}

/** mark the current recovery as done */
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
assert multiFileWriter.tempFileNames.isEmpty() : "not all temporary files are renamed";
try {
// this might still throw an exception ie. if the shard is CLOSED due to some other event.
// it's safer to decrement the reference in a try finally here.
indexShard.postRecovery("peer recovery done");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
listener.onRecoveryDone(state());
}
}

@Override
protected void closeInternal() {
try {
multiFileWriter.close();
} finally {
// free store. increment happens in constructor
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
}
listener.onFailure(state(), e, sendShardFailure);
}

@Override
public String toString() {
return shardId + " [" + recoveryId + "]";
}

private void ensureRefCount() {
if (refCount() <= 0) {
throw new OpenSearchException(
"RecoveryStatus is used but it's refcount is 0. Probably a mismatch between incRef/decRef " + "calls"
);
}
return shardId() + " [" + getId() + "]";
}

/*** Implementation of {@link RecoveryTargetHandler } */
Expand Down Expand Up @@ -371,7 +266,7 @@ public void indexTranslogOperations(
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
throw new IndexShardNotRecoveringException(shardId(), indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
Expand Down Expand Up @@ -457,7 +352,7 @@ public void cleanFiles(
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
globalCheckpoint,
shardId,
shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
Expand Down Expand Up @@ -510,6 +405,8 @@ public void writeFileChunk(
int totalTranslogOps,
ActionListener<Void> listener
) {
state().getTranslog().totalOperations(totalTranslogOps);
this.writeFileChunk(fileMetadata, position, content, lastChunk, listener);
try {
state().getTranslog().totalOperations(totalTranslogOps);
multiFileWriter.writeFileChunk(fileMetadata, position, content, lastChunk);
Expand Down
Loading

0 comments on commit 76f09bb

Please sign in to comment.