Skip to content

Commit

Permalink
Ignore replication checkpoints if we are already up to the published …
Browse files Browse the repository at this point in the history
…checkpoint.

This change ensures we do not start a replication sequence if we already have the checkpoint.
This changes the checkpoint published from the primary to the latest processed checkpoint instead of the latest persisted.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 18, 2022
1 parent b1a0510 commit 9728020
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 10 deletions.
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen) throws IOException {};
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};

public long getProcessedLocalCheckpoint() { return 0L; };

/**
* A throttling class that can be activated, causing the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,15 @@ public InternalEngine(EngineConfig engineConfig) {
}

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen) throws IOException {
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
assert engineConfig.isPrimary() == false : "Only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(),
toIndexInput(infosBytes),
gen);
assert gen == infos.getGeneration();
externalReaderManager.internalReaderManager.setCurrentInfos(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
}

private ChecksumIndexInput toIndexInput(byte[] input) {
Expand Down Expand Up @@ -2573,7 +2574,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
logger.trace("rollback indexWriter");
try {
indexWriter.rollback();
if (engineConfig.isPrimary()) {
indexWriter.rollback();
}
} catch (AlreadyClosedException ex) {
failOnTragicEvent(ex);
throw ex;
Expand Down Expand Up @@ -2926,6 +2929,7 @@ public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}
Expand Down
25 changes: 20 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1516,11 +1516,11 @@ public SegmentInfos getLatestSegmentInfos() {
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(this.shardId, getOperationPrimaryTerm(), getLatestSegmentInfos().getGeneration(), getLocalCheckpoint());
return new ReplicationCheckpoint(this.shardId, getOperationPrimaryTerm(), getLatestSegmentInfos().getGeneration(), getProcessedLocalCheckpoint());
}

public void updateCurrentInfos(long gen, byte[] infosBytes) throws IOException {
getEngine().updateCurrentInfos(infosBytes, gen);
public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infosBytes, gen, seqNo);
}

/**
Expand Down Expand Up @@ -3088,7 +3088,7 @@ public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Segrep recovery complete.");
postRecovery("Segrep complete.");
}
@Override
public void onFailure(Exception e) {
Expand Down Expand Up @@ -3701,12 +3701,27 @@ public void sync() throws IOException {
getEngine().syncTranslog();
}

private ReplicationCheckpoint getLocalReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, getPendingPrimaryTerm(), getLatestSegmentInfos().getGeneration(), getEngine().getProcessedLocalCheckpoint());
}

public long getProcessedLocalCheckpoint() {
return getEngine().getProcessedLocalCheckpoint();
}

/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request,
final PrimaryShardReplicationSource source,
final SegmentReplicationService segmentReplicationService) {
logger.debug("Checkpoint received {}", request.getCheckpoint());
ReplicationCheckpoint localCheckpoint = getLocalReplicationCheckpoint();
logger.debug("Local Checkpoint {}", getLocalReplicationCheckpoint());
if (localCheckpoint.equals(request.getCheckpoint())) {
logger.debug("Ignore - Shard is already on checkpoint");
return;
}
if (state.equals(IndexShardState.STARTED) == false) {
logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state);
return;
Expand All @@ -3724,7 +3739,7 @@ public synchronized void onNewCheckpoint(final PublishCheckpointRequest request,
@Override
public void onReplicationDone(ReplicationState state) {
finalizeReplication();
logger.info("Replication complete.");
logger.debug("Replication complete to {}", getLocalReplicationCheckpoint());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class CopyState extends AbstractRefCounted {
super("replication-nrt-state");
this.segmentInfosRef = shard.getLatestSegmentInfosSafe();
final SegmentInfos segmentInfos = segmentInfosRef.getSegmentInfos();
this.checkpoint = new ReplicationCheckpoint(shard.shardId(), shard.getOperationPrimaryTerm(), segmentInfos.getGeneration(), shard.getLocalCheckpoint());
this.checkpoint = new ReplicationCheckpoint(shard.shardId(), shard.getOperationPrimaryTerm(), segmentInfos.getGeneration(), shard.getProcessedLocalCheckpoint());
this.metadataSnapshot = shard.store().getMetadata(segmentInfos);
ByteBuffersDataOutput buffer = new ByteBuffersDataOutput();
try (ByteBuffersIndexOutput tmpIndexOutput =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo,
final Directory directory = store().directory();
directory.sync(Arrays.asList(directory.listAll()));
}
indexShard.updateCurrentInfos(segmentsGen, checkpointInfo.getInfosBytes());
indexShard.updateCurrentInfos(segmentsGen, checkpointInfo.getInfosBytes(), checkpointInfo.getCheckpoint().getSeqNo());
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down

0 comments on commit 9728020

Please sign in to comment.