Skip to content

Commit

Permalink
rename to replication
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed May 18, 2022
1 parent 47379df commit 233acbc
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ShardTargetState;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
import org.opensearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -745,7 +745,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
}

public void handleRecoveryDone(ShardTargetState state, ShardRouting shardRouting, long primaryTerm) {
public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState RecState = (RecoveryState) state;
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.replication.common.ShardTargetCollection;
import org.opensearch.indices.replication.common.ShardTargetCollection.ShardTargetRef;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -125,7 +125,7 @@ public static class Actions {
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;

private final ShardTargetCollection<RecoveryTarget> onGoingRecoveries;
private final ReplicationCollection<RecoveryTarget> onGoingRecoveries;

public PeerRecoveryTargetService(
ThreadPool threadPool,
Expand All @@ -137,7 +137,7 @@ public PeerRecoveryTargetService(
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new ShardTargetCollection<>(logger, threadPool);
this.onGoingRecoveries = new ReplicationCollection<>(logger, threadPool);

transportService.registerRequestHandler(
Actions.FILES_INFO,
Expand Down Expand Up @@ -229,7 +229,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final TransportRequest requestToSend;
final StartRecoveryRequest startRequest;
final ReplicationTimer timer;
try (ShardTargetCollection.ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
Expand Down Expand Up @@ -353,7 +353,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand

@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request);
if (listener == null) {
return;
Expand All @@ -368,7 +368,7 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery

@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request);
if (listener == null) {
return;
Expand All @@ -384,7 +384,7 @@ class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<Rec
@Override
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel, Task task)
throws Exception {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
recoveryRef.get().handoffPrimaryContext(request.primaryContext());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
Expand All @@ -397,7 +397,7 @@ class TranslogOperationsRequestHandler implements TransportRequestHandler<Recove
@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel, Task task)
throws IOException {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(
recoveryRef,
Expand All @@ -417,7 +417,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
private void performTranslogOps(
final RecoveryTranslogOperationsRequest request,
final ActionListener<Void> listener,
final ShardTargetRef<RecoveryTarget> recoveryRef
final ReplicationRef<RecoveryTarget> recoveryRef
) {
final RecoveryTarget recoveryTarget = recoveryRef.get();

Expand All @@ -433,7 +433,7 @@ private void performTranslogOps(
public void onNewClusterState(ClusterState state) {
threadPool.generic().execute(ActionRunnable.wrap(listener, l -> {
try (
ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(
ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(
request.recoveryId(),
request.shardId()
)
Expand Down Expand Up @@ -483,7 +483,7 @@ class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesIn

@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request);
if (listener == null) {
return;
Expand All @@ -506,7 +506,7 @@ class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanF

@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request);
if (listener == null) {
return;
Expand All @@ -525,7 +525,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler<Recove

@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
Expand Down Expand Up @@ -561,7 +561,7 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
}

private ActionListener<Void> createOrFinishListener(
final ShardTargetRef<RecoveryTarget> recoveryRef,
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request
Expand All @@ -570,7 +570,7 @@ private ActionListener<Void> createOrFinishListener(
}

private ActionListener<Void> createOrFinishListener(
final ShardTargetRef<RecoveryTarget> recoveryRef,
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request,
Expand Down Expand Up @@ -607,7 +607,7 @@ class RecoveryRunner extends AbstractRunnable {

@Override
public void onFailure(Exception e) {
try (ShardTargetRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
if (recoveryRef != null) {
logger.error(() -> new ParameterizedMessage("unexpected error during recovery [{}], failing shard", recoveryId), e);
onGoingRecoveries.fail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.replication.common.ShardTargetListener;
import org.opensearch.indices.replication.common.ShardTargetState;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;

/**
* Listener that runs on changes in Recovery state
*
* @opensearch.internal
*/
public class RecoveryListener implements ShardTargetListener {
public class RecoveryListener implements ReplicationListener {

/**
* ShardRouting with which the shard was created
Expand All @@ -44,12 +44,12 @@ public RecoveryListener(
}

@Override
public void onDone(ShardTargetState state) {
public void onDone(ReplicationState state) {
indicesClusterStateService.handleRecoveryDone(state, shardRouting, primaryTerm);
}

@Override
public void onFailure(ShardTargetState state, OpenSearchException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.ShardTargetState;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;

Expand All @@ -57,7 +57,7 @@
*
* @opensearch.internal
*/
public class RecoveryState implements ShardTargetState, ToXContentFragment, Writeable {
public class RecoveryState implements ReplicationState, ToXContentFragment, Writeable {

/**
* The stage of the recovery state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ShardTarget;
import org.opensearch.indices.replication.common.ShardTargetListener;
import org.opensearch.indices.replication.common.ShardTargetCollection;
import org.opensearch.indices.replication.common.ReplicationTarget;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationCollection;

import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -67,11 +67,11 @@

/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
* this class are created through {@link ShardTargetCollection}.
* this class are created through {@link ReplicationCollection}.
*
* @opensearch.internal
*/
public class RecoveryTarget extends ShardTarget implements RecoveryTargetHandler {
public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetHandler {

private static final String RECOVERY_PREFIX = "recovery.";

Expand All @@ -90,7 +90,7 @@ public class RecoveryTarget extends ShardTarget implements RecoveryTargetHandler
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ShardTargetListener listener) {
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.cancellableThreads = new CancellableThreads();
this.sourceNode = sourceNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@
import java.util.concurrent.ConcurrentMap;

/**
* This class holds a collection of all on going events on the current node (i.e., the node is the target node
* of those events). The class is used to guarantee concurrent semantics such that once a recoveries was done/cancelled/failed
* no other thread will be able to find it. Last, the {@link ShardTargetRef} inner class verifies that recovery temporary files
* This class holds a collection of all on going replication events on the current node (i.e., the node is the target node
* of those events). The class is used to guarantee concurrent semantics such that once an event was done/cancelled/failed
* no other thread will be able to find it. Last, the {@link ReplicationRef} inner class verifies that temporary files
* and store will only be cleared once on going usage is finished.
*
* @opensearch.internal
*/
public class ShardTargetCollection<T extends ShardTarget> {
public class ReplicationCollection<T extends ReplicationTarget> {

/** This is the single source of truth for ongoing target events. If it's not here, it was canceled or done */
private final ConcurrentMap<Long, T> onGoingTargetEvents = ConcurrentCollections.newConcurrentMap();

private final Logger logger;
private final ThreadPool threadPool;

public ShardTargetCollection(Logger logger, ThreadPool threadPool) {
public ReplicationCollection(Logger logger, ThreadPool threadPool) {
this.logger = logger;
this.threadPool = threadPool;
}
Expand All @@ -86,7 +86,7 @@ private void startInternal(T target, TimeValue activityTimeout) {
assert existingTarget == null : "found two Target instances with the same id";
logger.trace("started {}", target.description());
threadPool.schedule(
new ShardTargetMonitor(target.getId(), target.lastAccessTime(), activityTimeout),
new ReplicationMonitor(target.getId(), target.lastAccessTime(), activityTimeout),
activityTimeout,
ThreadPool.Names.GENERIC
);
Expand Down Expand Up @@ -143,23 +143,23 @@ public T getTarget(long id) {
}

/**
* gets the {@link ShardTarget } for a given id. The ShardTarget returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link ShardTarget#decRef()} when you are done with it, typically
* gets the {@link ReplicationTarget } for a given id. The ShardTarget returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link ReplicationTarget#decRef()} when you are done with it, typically
* by using this method in a try-with-resources clause.
* <p>
* Returns null if recovery is not found
*/
public ShardTargetRef<T> get(long id) {
public ReplicationRef<T> get(long id) {
T status = onGoingTargetEvents.get(id);
if (status != null && status.tryIncRef()) {
return new ShardTargetRef<T>(status);
return new ReplicationRef<T>(status);
}
return null;
}

/** Similar to {@link #get(long)} but throws an exception if no target is found */
public ShardTargetRef<T> getSafe(long id, ShardId shardId) {
ShardTargetRef<T> ref = get(id);
public ReplicationRef<T> getSafe(long id, ShardId shardId) {
ReplicationRef<T> ref = get(id);
if (ref == null) {
throw new IndexShardClosedException(shardId);
}
Expand Down Expand Up @@ -236,31 +236,31 @@ public boolean cancelForShard(ShardId shardId, String reason) {
}

/**
* a reference to {@link ShardTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link ShardTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link ShardTargetRef#close()} is called.
* a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link ReplicationRef#close()} is called.
*
* @opensearch.internal
*/
public static class ShardTargetRef<T extends ShardTarget> extends AutoCloseableRefCounted<T> {
public static class ReplicationRef<T extends ReplicationTarget> extends AutoCloseableRefCounted<T> {

/**
* Important: {@link ShardTarget#tryIncRef()} should
* Important: {@link ReplicationTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public ShardTargetRef(T status) {
public ReplicationRef(T status) {
super(status);
status.setLastAccessTime();
}
}

private class ShardTargetMonitor extends AbstractRunnable {
private class ReplicationMonitor extends AbstractRunnable {
private final long id;
private final TimeValue checkInterval;

private volatile long lastSeenAccessTime;

private ShardTargetMonitor(long id, long lastSeenAccessTime, TimeValue checkInterval) {
private ReplicationMonitor(long id, long lastSeenAccessTime, TimeValue checkInterval) {
this.id = id;
this.checkInterval = checkInterval;
this.lastSeenAccessTime = lastSeenAccessTime;
Expand Down
Loading

0 comments on commit 233acbc

Please sign in to comment.