Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding onNewCheckpoint to Start Replication on Replica Shard when Segment Replication is turned on #3540

Merged
merged 16 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
public SegmentInfos getLatestSegmentInfos() {
protected SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = internalReaderManager.acquire();
Expand Down
21 changes: 13 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -1374,18 +1373,24 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the lastest Replication Checkpoint that shard received
* Returns the lastest segmentInfos
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
public SegmentInfos getLatestSegmentInfos() {
return getEngine().getSegmentInfosSnapshot().get();
Copy link
Member

@mch2 mch2 Jun 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this before - getSegmentInfosSnapshot returns a GatedCloseable, we should use try-with-resources.

        try(final GatedCloseable<SegmentInfos> snapshot = getEngine().getSegmentInfosSnapshot()) {
            return  snapshot.get();
        }

there is also a getSegmentInfosSnapshot method in IndexShard, we don't have to fetch off the engine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the note above. How is this different from the existing getSegmentInfosSnapshot ?

public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is same as getSegmentInfosSnapshot(). So we can call that directly

}

/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
* Returns the lastest Replication Checkpoint that shard received
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
assert shardRouting.primary() == false;
// TODO
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
return new ReplicationCheckpoint(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a NRTReplicationEngine that is starting up, it will not yet have anything returned from getLatestSegmentInfos until the first replication event completes. I think we need to null check here and return an "empty" ReplicationCheckpoint in that case, where gen and version could be SequenceNumbers.NO_OPS_PERFORMED.

this.shardId,
getOperationPrimaryTerm(),
latestSegmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
latestSegmentInfos.getVersion()
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoverySettings;
Expand All @@ -38,7 +40,7 @@
*
* @opensearch.internal
*/
public final class SegmentReplicationTargetService implements IndexEventListener {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
public class SegmentReplicationTargetService implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);

Expand All @@ -58,6 +60,7 @@ public static class Actions {
public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk";
}

@Inject
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -84,6 +87,33 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
}
}

/**
* Invoked when a new checkpoint is received from a primary shard.
* It checks if a new checkpoint should be processed or not and starts replication if needed.
* @param requestCheckpoint received checkpoint that is checked for processing
* @param indexShard replica shard on which checkpoint is received
*/
public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this class is no longer final, this method can be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not possible as shardOperationOnReplica() in PublishCheckpointAction still needs to assert onNewCheckpoint(). Possibly need to use mockito-inline for making this or entire class final

Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("Checkpoint received {}", () -> requestCheckpoint);
if (shouldProcessCheckpoint(requestCheckpoint, indexShard)) {
logger.trace("Processing new checkpoint {}", requestCheckpoint);
startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace("Replication complete to {}", indexShard.getLatestReplicationCheckpoint());
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
}
});

}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove trace logging if thats not needed add error logging on shard failure?

}

public void startReplication(
final ReplicationCheckpoint checkpoint,
final IndexShard indexShard,
Expand All @@ -98,6 +128,38 @@ public void startReplication(final SegmentReplicationTarget target) {
threadPool.generic().execute(new ReplicationRunner(replicationId));
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @param indexShard replica shard on which checkpoint is received
* @return true if checkpoint should be processed
*/
private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, IndexShard indexShard) {
if (indexShard.state().equals(IndexShardState.STARTED) == false) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding what happens to a shard in RECOVERING state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will take care of RECOVERING state logic in our next PR. Shards in RECOVERING state will never receive these new checkpoints. We'll be updating the recovery process to first ping the primary to have the local shard tracked, and then force a replication event

logger.trace("Ignoring new replication checkpoint - shard is not started {}", indexShard.state());
return false;
}
ReplicationCheckpoint localCheckpoint = indexShard.getLatestReplicationCheckpoint();
if (onGoingReplications.isShardReplicating(indexShard.shardId())) {
logger.trace("Ignoring new replication checkpoint - shard is currently replicating to a checkpoint");
return false;
}
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
localCheckpoint,
requestCheckpoint
);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint);
return false;
}
return true;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method be in IndexShard instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for catching this. I have moved this to IndexShard in new commit


/**
* Listener that runs on changes in Replication state
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction<
public static final String ACTION_NAME = "indices:admin/publishCheckpoint";
protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class);

private final SegmentReplicationTargetService replicationService;

@Inject
public PublishCheckpointAction(
Settings settings,
Expand All @@ -60,7 +63,8 @@ public PublishCheckpointAction(
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters
ActionFilters actionFilters,
SegmentReplicationTargetService targetService
) {
super(
settings,
Expand All @@ -75,6 +79,7 @@ public PublishCheckpointAction(
PublishCheckpointRequest::new,
ThreadPool.Names.REFRESH
);
this.replicationService = targetService;
}

@Override
Expand Down Expand Up @@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh
ActionListener.completeWith(listener, () -> {
logger.trace("Checkpoint received on replica {}", request);
if (request.getCheckpoint().getShardId().equals(replica.shardId())) {
replica.onNewCheckpoint(request);
replicationService.onNewCheckpoint(request.getCheckpoint(), replica);
}
return new ReplicaResult();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public int hashCode() {
* Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null
*/
public boolean isAheadOf(@Nullable ReplicationCheckpoint other) {
return other == null || segmentInfosVersion > other.getSegmentInfosVersion();
return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) {
return cancelled;
}

/**
* check if a shard is currently replicating
*
* @param shardId shardId for which to check if replicating
* @return true if shard is currently replicating
*/
public boolean isShardReplicating(ShardId shardId) {
return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit-pick - this is a problem to be solved down the line:

isShardReplicating seems like a frequent check, and constantly flattening the map to a stream of values may end up being performance-intensive. We should consider if we could make the key of the map the shardId to speed up this check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can do that. But we have to change it in multiple places, so thinking of doing that in different PR

}

/**
* a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.SetOnce;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.Assertions;
import org.opensearch.Build;
Expand Down Expand Up @@ -932,6 +934,15 @@ protected Node(
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
.toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService));
b.bind(SegmentReplicationTargetService.class)
Copy link
Member

@mch2 mch2 Jun 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are binding here, you do not need @Inject on SegmentReplicationTargetService's constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this binding also be gated by the feature flag, like SegmentReplicationSourceService below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes removed @Inject SegmentReplicationTargetService's constructor. Yes this should also be gated by feature flag. But need some changes in binding SegmentReplicationCheckpointPublisher in IndicesModule, will add them in next commit.

.toInstance(
new SegmentReplicationTargetService(
threadPool,
recoverySettings,
transportService,
new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService)
)
);
}
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.junit.Assert;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void setUp() throws Exception {
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
final TransportService transportService = mock(TransportService.class);
indexShard = newShard(false, settings);
indexShard = newStartedShard(false, settings);
checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L);
SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class);
replicationSource = mock(SegmentReplicationSource.class);
Expand All @@ -48,12 +49,6 @@ public void setUp() throws Exception {
sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory);
}

@Override
public void tearDown() throws Exception {
closeShards(indexShard);
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
super.tearDown();
}

public void testTargetReturnsSuccess_listenerCompletes() throws IOException {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Expand Down Expand Up @@ -111,6 +106,68 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept
closeShards(indexShard);
}

public void testAlreadyOnNewCheckpoint() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(indexShard);
}

public void testShardAlreadyReplicating() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
replicationSource,
mock(SegmentReplicationTargetService.SegmentReplicationListener.class)
);
final SegmentReplicationTarget spyTarget = Mockito.spy(target);
spy.startReplication(spyTarget);
spy.onNewCheckpoint(checkpoint, indexShard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(indexShard);
}

public void testNewCheckpointBehindCurrentCheckpoint() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
spy.onNewCheckpoint(checkpoint, indexShard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(indexShard);
}

public void testShardNotStarted() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
IndexShard shard = newShard(false);
spy.onNewCheckpoint(checkpoint, shard);
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(shard);
closeShards(indexShard);
}

public void testShouldProcessCheckpoint() throws IOException {
allowShardFailures();
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(indexShard);
ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint();
ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint(
cp.getShardId(),
cp.getPrimaryTerm(),
cp.getSegmentsGen(),
cp.getSeqNo(),
cp.getSegmentInfosVersion() + 1
);
spy.onNewCheckpoint(newCheckpoint, spyShard);
ArgumentCaptor<SegmentReplicationTargetService.SegmentReplicationListener> captor = ArgumentCaptor.forClass(
SegmentReplicationTargetService.SegmentReplicationListener.class
);
verify(spy, times(1)).startReplication(any(), any(), captor.capture());
SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue();
listener.onFailure(new SegmentReplicationState(), new OpenSearchException("testing"), true);
verify(spyShard).failShard(any(), any());
closeShard(indexShard, false);

}

public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
Expand Down
Loading