diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ed2558dee2fef..0da976927a16c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -167,7 +167,7 @@ import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.indices.replication.copy.ReplicationFailedException; -import org.opensearch.indices.replication.copy.ReplicationState; +import org.opensearch.indices.replication.copy.SegmentReplicationState; import org.opensearch.indices.replication.copy.TrackShardResponse; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -262,7 +262,7 @@ Runnable getGlobalCheckpointSyncer() { @Nullable private volatile RecoveryState recoveryState; - private volatile ReplicationState replicationState; + private volatile SegmentReplicationState segRepState; private final RecoveryStats recoveryStats = new RecoveryStats(); private final MeanMetric refreshMetric = new MeanMetric(); @@ -413,7 +413,7 @@ public boolean shouldCache(Query query) { this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); - this.replicationState = new ReplicationState(); + this.segRepState = new SegmentReplicationState(); } public ThreadPool getThreadPool() { @@ -3082,7 +3082,7 @@ public void startRecovery( new ActionListener() { @Override public void onResponse(TrackShardResponse unused) { - segRepListener.onReplicationDone(replicationState); + segRepListener.onReplicationDone(segRepState); recoveryState.getIndex().setFileDetailsComplete(); finalizeRecovery(); postRecovery("Shard setup complete."); @@ -3090,11 +3090,7 @@ public void onResponse(TrackShardResponse unused) { @Override public void onFailure(Exception e) { - segRepListener.onReplicationFailure( - replicationState, - new ReplicationFailedException(indexShard, e), - true - ); + segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true); } } ); @@ -3750,13 +3746,17 @@ public synchronized void onNewCheckpoint( source, new SegmentReplicationReplicaService.SegmentReplicationListener() { @Override - public void onReplicationDone(ReplicationState state) { + public void onReplicationDone(SegmentReplicationState state) { markReplicationComplete(); logger.debug("Replication complete to {}", getLatestReplicationCheckpoint()); } @Override - public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { markReplicationComplete(); logger.error("Failure", e); } @@ -3767,20 +3767,20 @@ public void onReplicationFailure(ReplicationState state, ReplicationFailedExcept } } - public ReplicationState getReplicationState() { - return this.replicationState; + public SegmentReplicationState getReplicationState() { + return this.segRepState; } public void markAsReplicating() { - this.replicationState.setStage(ReplicationState.Stage.ACTIVE); + this.segRepState.setStage(SegmentReplicationState.Stage.ACTIVE); } public void markReplicationComplete() { - this.replicationState.setStage(ReplicationState.Stage.INACTIVE); + this.segRepState.setStage(SegmentReplicationState.Stage.INACTIVE); } private boolean isReplicating() { - return this.replicationState.getStage() == ReplicationState.Stage.ACTIVE; + return this.segRepState.getStage() == SegmentReplicationState.Stage.ACTIVE; } /** diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 88c60010ddfdb..e09add708b14c 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -83,7 +83,7 @@ import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationFailedException; -import org.opensearch.indices.replication.copy.ReplicationState; +import org.opensearch.indices.replication.copy.SegmentReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; import org.opensearch.snapshots.SnapshotShardsService; @@ -766,13 +766,13 @@ private ShardRoutingReplicationListener(final ShardRouting shardRouting, final l } @Override - public void onReplicationDone(final ReplicationState state) { + public void onReplicationDone(final SegmentReplicationState state) { logger.info("Shard setup complete, ready for segment copy."); shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER); } @Override - public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { handleRecoveryFailure(shardRouting, sendShardFailure, e); logger.error("Shard setup failed", e); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 03ba6cf2aeb47..bf73aa2692e0c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -71,7 +71,7 @@ import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef; import org.opensearch.indices.replication.common.ReplicationListener; -import org.opensearch.indices.replication.common.RState; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; @@ -354,12 +354,12 @@ public static StartRecoveryRequest getStartRecoveryRequest( public interface RecoveryListener extends ReplicationListener { @Override - default void onDone(RState state) { + default void onDone(ReplicationState state) { onRecoveryDone((RecoveryState) state); } @Override - default void onFailure(RState state, OpenSearchException e, boolean sendShardFailure) { + default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { onRecoveryFailure((RecoveryState) state, (RecoveryFailedException) e, sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index 19d3bb24be96b..e1288d6d31e35 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -45,7 +45,7 @@ import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.replication.common.RState; +import org.opensearch.indices.replication.common.ReplicationState; import java.io.IOException; import java.util.Locale; @@ -53,7 +53,7 @@ /** * Keeps track of state related to shard recovery. */ -public class RecoveryState implements ToXContentFragment, Writeable, RState { +public class RecoveryState extends ReplicationState implements ToXContentFragment, Writeable { public enum Stage { INIT((byte) 0), @@ -108,12 +108,8 @@ public static Stage fromId(byte id) { } private Stage stage; - - private final RecoveryIndex index; private final Translog translog; private final VerifyIndex verifyIndex; - private final Timer timer; - private RecoverySource recoverySource; private ShardId shardId; @Nullable @@ -126,6 +122,7 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla } public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, RecoveryIndex index) { + super(index); assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting; RecoverySource recoverySource = shardRouting.recoverySource(); assert (recoverySource.getType() == RecoverySource.Type.PEER) == (sourceNode != null) @@ -136,11 +133,8 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla this.sourceNode = sourceNode; this.targetNode = targetNode; stage = Stage.INIT; - this.index = index; translog = new Translog(); verifyIndex = new VerifyIndex(); - timer = new Timer(); - timer.start(); } public RecoveryState(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java index aa8df620f1412..1d3604edd6d15 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -37,12 +37,12 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.Timer; import org.opensearch.indices.replication.common.ReplicationListener; -import org.opensearch.indices.replication.common.RState; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.indices.replication.copy.ReplicationCollection; import org.opensearch.indices.replication.copy.ReplicationFailedException; -import org.opensearch.indices.replication.copy.ReplicationState; +import org.opensearch.indices.replication.copy.SegmentReplicationState; import org.opensearch.indices.replication.copy.SegmentReplicationTarget; import org.opensearch.indices.replication.copy.SegmentReplicationPrimaryService; import org.opensearch.indices.replication.copy.TrackShardRequest; @@ -291,17 +291,17 @@ public void onFailure(Exception e) { public interface SegmentReplicationListener extends ReplicationListener { @Override - default void onDone(RState state) { - onReplicationDone((ReplicationState) state); + default void onDone(ReplicationState state) { + onReplicationDone((SegmentReplicationState) state); } @Override - default void onFailure(RState state, OpenSearchException e, boolean sendShardFailure) { - onReplicationFailure((ReplicationState) state, (ReplicationFailedException) e, sendShardFailure); + default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + onReplicationFailure((SegmentReplicationState) state, (ReplicationFailedException) e, sendShardFailure); } - void onReplicationDone(ReplicationState state); + void onReplicationDone(SegmentReplicationState state); - void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure); + void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/RState.java b/server/src/main/java/org/opensearch/indices/replication/common/RState.java deleted file mode 100644 index bfc57d9b6686f..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/common/RState.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.common; - -public interface RState { - - // TODO Add APIs here -} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java index 4c5d20580d113..f0972176169bd 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java @@ -12,7 +12,7 @@ public interface ReplicationListener { - void onDone(RState state); + void onDone(ReplicationState state); - void onFailure(RState state, OpenSearchException e, boolean sendShardFailure); + void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java new file mode 100644 index 0000000000000..8ea9162067513 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.indices.recovery.RecoveryIndex; +import org.opensearch.indices.recovery.Timer; + +public class ReplicationState { + + protected Timer timer; + protected RecoveryIndex index; + + protected ReplicationState() { + // Empty default constructor for subclasses + } + + protected ReplicationState(RecoveryIndex index) { + this.index = index; + timer = new Timer(); + timer.start(); + } + + public Timer getTimer() { + return timer; + } + + public RecoveryIndex getIndex() { + return index; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 151b4a4011f5e..18601e97ad7d8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -51,7 +51,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected abstract void onFail(OpenSearchException e, boolean sendShardFailure); - public abstract RState state(); + public abstract ReplicationState state(); public ReplicationTarget(String name, IndexShard indexShard, RecoveryIndex recoveryStateIndex, ReplicationListener listener) { super(name); diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationState.java similarity index 71% rename from server/src/main/java/org/opensearch/indices/replication/copy/ReplicationState.java rename to server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationState.java index 0721756b7ea8f..3ceda8775826b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationState.java @@ -10,37 +10,34 @@ import org.opensearch.indices.recovery.RecoveryIndex; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.recovery.Timer; -import org.opensearch.indices.replication.common.RState; +import org.opensearch.indices.replication.common.ReplicationState; -public class ReplicationState implements RState { +public class SegmentReplicationState extends ReplicationState { - private Timer timer; - private RecoveryIndex index; private Stage stage; - public ReplicationState(RecoveryIndex index) { - this.index = index; - this.timer = new Timer(); + public SegmentReplicationState(RecoveryIndex index) { + super(index); stage = Stage.INACTIVE; - timer.start(); } - public ReplicationState() { + public SegmentReplicationState() { stage = Stage.INACTIVE; } - public Timer getTimer() { - return timer; + public synchronized Stage getStage() { + return this.stage; } - public RecoveryIndex getIndex() { - return index; + // synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe + public synchronized void setStage(Stage stage) { + this.stage = stage; } /** * THis class duplicates the purpose/functionality of {@link RecoveryState.Stage} * so this temporary implementation simply aliases the enums from the other class. + * TODO Merge this class with the above Stage class once segrep lifecycle is finalized */ public enum Stage { // TODO: Add more steps here. @@ -58,13 +55,4 @@ public byte id() { return id; } } - - public synchronized Stage getStage() { - return this.stage; - } - - // synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe - public synchronized void setStage(Stage stage) { - this.stage = stage; - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index e4729166654e8..d3179692a2bb4 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -65,7 +65,7 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final ReplicationCheckpoint checkpoint; private final PrimaryShardReplicationSource source; - private final ReplicationState state; + private final SegmentReplicationState state; /** * Creates a new replication target object that represents a replication to the provided source. @@ -83,7 +83,7 @@ public SegmentReplicationTarget( super("replication_status", indexShard, new RecoveryIndex(), listener); this.checkpoint = checkpoint; this.source = source; - state = new ReplicationState(recoveryStateIndex); + state = new SegmentReplicationState(recoveryStateIndex); } @Override @@ -115,7 +115,7 @@ public void fail(ReplicationFailedException e, boolean sendShardFailure) { } @Override - public ReplicationState state() { + public SegmentReplicationState state() { return state; }