Skip to content

Commit

Permalink
Adding onNewCheckpoint and it's test to start replication. SCheck for…
Browse files Browse the repository at this point in the history
… latestcheckpoint and replaying logic is removed from this commit and will be added in a different PR

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Jun 8, 2022
1 parent 81a77aa commit 8b288d8
Showing 12 changed files with 184 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ public final EngineConfig config() {
* Return the latest active SegmentInfos from the engine.
* @return {@link SegmentInfos}
*/
protected abstract SegmentInfos getLatestSegmentInfos();
public abstract SegmentInfos getLatestSegmentInfos();

/**
* In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos}
Original file line number Diff line number Diff line change
@@ -431,7 +431,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
protected SegmentInfos getLatestSegmentInfos() {
public SegmentInfos getLatestSegmentInfos() {
return readerManager.getSegmentInfos();
}

Original file line number Diff line number Diff line change
@@ -271,7 +271,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
protected SegmentInfos getLatestSegmentInfos() {
public SegmentInfos getLatestSegmentInfos() {
return lastCommittedSegmentInfos;
}

28 changes: 20 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
@@ -160,7 +160,6 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
@@ -1374,18 +1373,31 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the lastest Replication Checkpoint that shard received
* Returns the IndexShardSate of current shard
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
public IndexShardState getState() {
return this.state;
}

/**
* Returns the lastest segmentInfos
*/
public SegmentInfos getLatestSegmentInfos() {
return getEngine().getLatestSegmentInfos();
}

/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
* Returns the lastest Replication Checkpoint that shard received
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
assert shardRouting.primary() == false;
// TODO
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
return new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
latestSegmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
latestSegmentInfos.getVersion()
);
}

/**
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
@@ -159,6 +160,7 @@ public class RecoverySettings {

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

@Inject
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoverySettings;
@@ -27,6 +28,7 @@ public class SegmentReplicationSourceFactory {
private RecoverySettings recoverySettings;
private ClusterService clusterService;

@Inject
public SegmentReplicationSourceFactory(
TransportService transportService,
RecoverySettings recoverySettings,
Original file line number Diff line number Diff line change
@@ -14,9 +14,11 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoverySettings;
@@ -38,7 +40,7 @@
*
* @opensearch.internal
*/
public final class SegmentReplicationTargetService implements IndexEventListener {
public class SegmentReplicationTargetService implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);

@@ -58,6 +60,7 @@ public static class Actions {
public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk";
}

@Inject
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
@@ -84,6 +87,31 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) {
logger.trace("Checkpoint received {}", () -> requestCheckpoint);
if (shouldProcessCheckpoint(requestCheckpoint, indexShard)) {
logger.trace("Processing new checkpoint {}", requestCheckpoint);
startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace("Replication complete to {}", indexShard.getLatestReplicationCheckpoint());
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
}
});

}
}

public void startReplication(
final ReplicationCheckpoint checkpoint,
final IndexShard indexShard,
@@ -98,6 +126,35 @@ public void startReplication(final SegmentReplicationTarget target) {
threadPool.generic().execute(new ReplicationRunner(replicationId));
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @param indexshard replica shard on which checkpoint is received
* @return true if checkpoint should be processed
*/
private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexshard) {
if (indexshard.getState().equals(IndexShardState.STARTED) == false) {
logger.debug("Ignore - shard is not started {}", indexshard.getState());
return false;
}
ReplicationCheckpoint localCheckpoint = indexshard.getLatestReplicationCheckpoint();
logger.debug("Local Checkpoint {}", indexshard.getLatestReplicationCheckpoint());
if (onGoingReplications.isShardReplicating(indexshard.shardId())) {
logger.debug("Ignore - shard is currently replicating to a checkpoint");
return false;
}
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint);
return false;
}
return true;
}

/**
* Listener that runs on changes in Replication state
*
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
@@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction<
public static final String ACTION_NAME = "indices:admin/publishCheckpoint";
protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class);

private final SegmentReplicationTargetService replicationService;

@Inject
public PublishCheckpointAction(
Settings settings,
@@ -60,7 +63,8 @@ public PublishCheckpointAction(
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters
ActionFilters actionFilters,
SegmentReplicationTargetService targetService
) {
super(
settings,
@@ -75,6 +79,7 @@ public PublishCheckpointAction(
PublishCheckpointRequest::new,
ThreadPool.Names.REFRESH
);
this.replicationService = targetService;
}

@Override
@@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
ActionListener.completeWith(listener, () -> {
logger.trace("Checkpoint received on replica {}", request);
if (request.getCheckpoint().getShardId().equals(replica.shardId())) {
replica.onNewCheckpoint(request);
replicationService.onNewCheckpoint(request.getCheckpoint(), replica);
}
return new ReplicaResult();
});
Original file line number Diff line number Diff line change
@@ -115,7 +115,7 @@ public int hashCode() {
* Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null
*/
public boolean isAheadOf(@Nullable ReplicationCheckpoint other) {
return other == null || segmentInfosVersion > other.getSegmentInfosVersion();
return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm();
}

@Override
Original file line number Diff line number Diff line change
@@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) {
return cancelled;
}

/**
* check if a shard is currently replicating
*
* @param shardId shardId for which to check if replicating
* @return true if shard is currently replicating
*/
public boolean isShardReplicating(ShardId shardId) {
return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId));
}

/**
* a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.junit.Assert;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
@@ -39,7 +40,7 @@ public void setUp() throws Exception {
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
final TransportService transportService = mock(TransportService.class);
indexShard = newShard(false, settings);
indexShard = newStartedShard(false, settings);
checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L);
SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class);
replicationSource = mock(SegmentReplicationSource.class);
@@ -48,12 +49,6 @@ public void setUp() throws Exception {
sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory);
}

@Override
public void tearDown() throws Exception {
closeShards(indexShard);
super.tearDown();
}

public void testTargetReturnsSuccess_listenerCompletes() throws IOException {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
@@ -111,6 +106,68 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept
closeShards(indexShard);
}

public void testAlreadyOnNewCheckpoint() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(indexShard);
}

public void testShardAlreadyReplicating() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
replicationSource,
mock(SegmentReplicationTargetService.SegmentReplicationListener.class)
);
final SegmentReplicationTarget spyTarget = Mockito.spy(target);
spy.startReplication(spyTarget);
spy.onNewCheckpoint(checkpoint, indexShard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(indexShard);
}

public void testNewCheckpointBehindCurrentCheckpoint() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
spy.onNewCheckpoint(checkpoint, indexShard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(indexShard);
}

public void testShardNotStarted() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
IndexShard shard = newShard(false);
spy.onNewCheckpoint(checkpoint, shard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(shard);
closeShards(indexShard);
}

public void testShouldProcessCheckpoint() throws IOException {
allowShardFailures();
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(indexShard);
ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint();
ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 1
);
spy.onNewCheckpoint(newCheckpoint, spyShard);
ArgumentCaptor<SegmentReplicationTargetService.SegmentReplicationListener> captor = ArgumentCaptor.forClass(
SegmentReplicationTargetService.SegmentReplicationListener.class
);
verify(spy, times(1)).startReplication(any(), any(), captor.capture());
SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue();
listener.onFailure(new SegmentReplicationState(), new OpenSearchException("testing"), true);
verify(spyShard).failShard(any(), any());
closeShard(indexShard, false);

}

public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Loading

0 comments on commit 8b288d8

Please sign in to comment.