Skip to content

Commit

Permalink
Renaming RState class to ReplicationState
Browse files Browse the repository at this point in the history
A small amount of common members and logic from its subclasses has been moved into the parent class. The original ReplicationState child class has been renamed SegmentReplicationState.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Mar 14, 2022
1 parent a5b5199 commit 065798f
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 82 deletions.
32 changes: 16 additions & 16 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.indices.replication.copy.TrackShardResponse;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -262,7 +262,7 @@ Runnable getGlobalCheckpointSyncer() {
@Nullable
private volatile RecoveryState recoveryState;

private volatile ReplicationState replicationState;
private volatile SegmentReplicationState segRepState;

private final RecoveryStats recoveryStats = new RecoveryStats();
private final MeanMetric refreshMetric = new MeanMetric();
Expand Down Expand Up @@ -413,7 +413,7 @@ public boolean shouldCache(Query query) {
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
this.replicationState = new ReplicationState();
this.segRepState = new SegmentReplicationState();
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3082,19 +3082,15 @@ public void startRecovery(
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
segRepListener.onReplicationDone(replicationState);
segRepListener.onReplicationDone(segRepState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
segRepListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true);
}
}
);
Expand Down Expand Up @@ -3750,13 +3746,17 @@ public synchronized void onNewCheckpoint(
source,
new SegmentReplicationReplicaService.SegmentReplicationListener() {
@Override
public void onReplicationDone(ReplicationState state) {
public void onReplicationDone(SegmentReplicationState state) {
markReplicationComplete();
logger.debug("Replication complete to {}", getLatestReplicationCheckpoint());
}

@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
markReplicationComplete();
logger.error("Failure", e);
}
Expand All @@ -3767,20 +3767,20 @@ public void onReplicationFailure(ReplicationState state, ReplicationFailedExcept
}
}

public ReplicationState getReplicationState() {
return this.replicationState;
public SegmentReplicationState getReplicationState() {
return this.segRepState;
}

public void markAsReplicating() {
this.replicationState.setStage(ReplicationState.Stage.ACTIVE);
this.segRepState.setStage(SegmentReplicationState.Stage.ACTIVE);
}

public void markReplicationComplete() {
this.replicationState.setStage(ReplicationState.Stage.INACTIVE);
this.segRepState.setStage(SegmentReplicationState.Stage.INACTIVE);
}

private boolean isReplicating() {
return this.replicationState.getStage() == ReplicationState.Stage.ACTIVE;
return this.segRepState.getStage() == SegmentReplicationState.Stage.ACTIVE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
import org.opensearch.indices.replication.SegmentReplicationReplicaService;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
import org.opensearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -766,13 +766,13 @@ private ShardRoutingReplicationListener(final ShardRouting shardRouting, final l
}

@Override
public void onReplicationDone(final ReplicationState state) {
public void onReplicationDone(final SegmentReplicationState state) {
logger.info("Shard setup complete, ready for segment copy.");
shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER);
}

@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(shardRouting, sendShardFailure, e);
logger.error("Shard setup failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.RState;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -354,12 +354,12 @@ public static StartRecoveryRequest getStartRecoveryRequest(
public interface RecoveryListener extends ReplicationListener {

@Override
default void onDone(RState state) {
default void onDone(ReplicationState state) {
onRecoveryDone((RecoveryState) state);
}

@Override
default void onFailure(RState state, OpenSearchException e, boolean sendShardFailure) {
default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
onRecoveryFailure((RecoveryState) state, (RecoveryFailedException) e, sendShardFailure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.RState;
import org.opensearch.indices.replication.common.ReplicationState;

import java.io.IOException;
import java.util.Locale;

/**
* Keeps track of state related to shard recovery.
*/
public class RecoveryState implements ToXContentFragment, Writeable, RState {
public class RecoveryState extends ReplicationState implements ToXContentFragment, Writeable {

public enum Stage {
INIT((byte) 0),
Expand Down Expand Up @@ -108,12 +108,8 @@ public static Stage fromId(byte id) {
}

private Stage stage;

private final RecoveryIndex index;
private final Translog translog;
private final VerifyIndex verifyIndex;
private final Timer timer;

private RecoverySource recoverySource;
private ShardId shardId;
@Nullable
Expand All @@ -126,6 +122,7 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla
}

public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, RecoveryIndex index) {
super(index);
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
RecoverySource recoverySource = shardRouting.recoverySource();
assert (recoverySource.getType() == RecoverySource.Type.PEER) == (sourceNode != null)
Expand All @@ -136,11 +133,8 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla
this.sourceNode = sourceNode;
this.targetNode = targetNode;
stage = Stage.INIT;
this.index = index;
translog = new Translog();
verifyIndex = new VerifyIndex();
timer = new Timer();
timer.start();
}

public RecoveryState(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.Timer;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.RState;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.copy.ReplicationCollection;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.indices.replication.copy.SegmentReplicationTarget;
import org.opensearch.indices.replication.copy.SegmentReplicationPrimaryService;
import org.opensearch.indices.replication.copy.TrackShardRequest;
Expand Down Expand Up @@ -291,17 +291,17 @@ public void onFailure(Exception e) {
public interface SegmentReplicationListener extends ReplicationListener {

@Override
default void onDone(RState state) {
onReplicationDone((ReplicationState) state);
default void onDone(ReplicationState state) {
onReplicationDone((SegmentReplicationState) state);
}

@Override
default void onFailure(RState state, OpenSearchException e, boolean sendShardFailure) {
onReplicationFailure((ReplicationState) state, (ReplicationFailedException) e, sendShardFailure);
default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
onReplicationFailure((SegmentReplicationState) state, (ReplicationFailedException) e, sendShardFailure);
}

void onReplicationDone(ReplicationState state);
void onReplicationDone(SegmentReplicationState state);

void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public interface ReplicationListener {

void onDone(RState state);
void onDone(ReplicationState state);

void onFailure(RState state, OpenSearchException e, boolean sendShardFailure);
void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication.common;

import org.opensearch.indices.recovery.RecoveryIndex;
import org.opensearch.indices.recovery.Timer;

public class ReplicationState {

protected Timer timer;
protected RecoveryIndex index;

protected ReplicationState() {
// Empty default constructor for subclasses
}

protected ReplicationState(RecoveryIndex index) {
this.index = index;
timer = new Timer();
timer.start();
}

public Timer getTimer() {
return timer;
}

public RecoveryIndex getIndex() {
return index;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted {

protected abstract void onFail(OpenSearchException e, boolean sendShardFailure);

public abstract RState state();
public abstract ReplicationState state();

public ReplicationTarget(String name, IndexShard indexShard, RecoveryIndex recoveryStateIndex, ReplicationListener listener) {
super(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,34 @@

import org.opensearch.indices.recovery.RecoveryIndex;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.Timer;
import org.opensearch.indices.replication.common.RState;
import org.opensearch.indices.replication.common.ReplicationState;

public class ReplicationState implements RState {
public class SegmentReplicationState extends ReplicationState {

private Timer timer;
private RecoveryIndex index;
private Stage stage;

public ReplicationState(RecoveryIndex index) {
this.index = index;
this.timer = new Timer();
public SegmentReplicationState(RecoveryIndex index) {
super(index);
stage = Stage.INACTIVE;
timer.start();
}

public ReplicationState() {
public SegmentReplicationState() {
stage = Stage.INACTIVE;
}

public Timer getTimer() {
return timer;
public synchronized Stage getStage() {
return this.stage;
}

public RecoveryIndex getIndex() {
return index;
// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe
public synchronized void setStage(Stage stage) {
this.stage = stage;
}

/**
* THis class duplicates the purpose/functionality of {@link RecoveryState.Stage}
* so this temporary implementation simply aliases the enums from the other class.
* TODO Merge this class with the above Stage class once segrep lifecycle is finalized
*/
public enum Stage {
// TODO: Add more steps here.
Expand All @@ -58,13 +55,4 @@ public byte id() {
return id;
}
}

public synchronized Stage getStage() {
return this.stage;
}

// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe
public synchronized void setStage(Stage stage) {
this.stage = stage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class SegmentReplicationTarget extends ReplicationTarget {

private final ReplicationCheckpoint checkpoint;
private final PrimaryShardReplicationSource source;
private final ReplicationState state;
private final SegmentReplicationState state;

/**
* Creates a new replication target object that represents a replication to the provided source.
Expand All @@ -83,7 +83,7 @@ public SegmentReplicationTarget(
super("replication_status", indexShard, new RecoveryIndex(), listener);
this.checkpoint = checkpoint;
this.source = source;
state = new ReplicationState(recoveryStateIndex);
state = new SegmentReplicationState(recoveryStateIndex);
}

@Override
Expand Down Expand Up @@ -115,7 +115,7 @@ public void fail(ReplicationFailedException e, boolean sendShardFailure) {
}

@Override
public ReplicationState state() {
public SegmentReplicationState state() {
return state;
}

Expand Down

0 comments on commit 065798f

Please sign in to comment.