Skip to content

Commit

Permalink
Init global checkpoint after copy commit in peer recovery (#40823)
Browse files Browse the repository at this point in the history
Today a new replica of a closed index does not have a safe commit
invariant when its engine is opened because we won't initialize the
global checkpoint on a recovering replica until the finalize step. With
this change, we can achieve that property by creating a new translog
with the global checkpoint from the primary at the end of phase 1.
  • Loading branch information
dnhatn authored Apr 12, 2019
1 parent 79c7a57 commit e9999df
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
// During a peer-recovery the global checkpoint is not known and up to date when the engine
// is created, so we only check the max seq no / global checkpoint coherency when the global
// Before 8.0 the global checkpoint is not known and up to date when the engine is created after
// peer recovery, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO
&& engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) {
final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated();
if (indexVersionCreated.onOrAfter(Version.V_7_1_0) ||
(globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) {
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
FilesInfoRequestHandler());
transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -540,7 +540,7 @@ class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanF
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportRequest;
Expand All @@ -29,37 +31,32 @@

public class RecoveryCleanFilesRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private final long recoveryId;
private final ShardId shardId;
private final Store.MetadataSnapshot snapshotFiles;
private final int totalTranslogOps;
private final long globalCheckpoint;

private Store.MetadataSnapshot snapshotFiles;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;

public RecoveryCleanFilesRequest() {
}

RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles, int totalTranslogOps) {
RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
int totalTranslogOps, long globalCheckpoint) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.snapshotFiles = snapshotFiles;
this.totalTranslogOps = totalTranslogOps;
this.globalCheckpoint = globalCheckpoint;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
RecoveryCleanFilesRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
snapshotFiles = new Store.MetadataSnapshot(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_1_0)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

@Override
Expand All @@ -69,13 +66,28 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
snapshotFiles.writeTo(out);
out.writeVInt(totalTranslogOps);
if (out.getVersion().onOrAfter(Version.V_7_1_0)) {
out.writeZLong(globalCheckpoint);
}
}

public Store.MetadataSnapshot sourceMetaSnapshot() {
return snapshotFiles;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public int totalTranslogOps() {
return totalTranslogOps;
}

public long getGlobalCheckpoint() {
return globalCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
startingSeqNo = 0;
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getGlobalCheckpoint(), () -> estimateNumOps);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
Expand Down Expand Up @@ -332,7 +332,7 @@ static final class SendFileResult {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
cancellableThreads.checkForCancel();
// Total size of segment files that are recovered
long totalSize = 0;
Expand Down Expand Up @@ -422,7 +422,7 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
// are deleted
try {
cancellableThreads.executeIO(() ->
recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata));
recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata));
} catch (RemoteTransportException | IOException targetException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
ActionListener.completeWith(listener, () -> {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
assert indexShard.getGlobalCheckpoint() >= indexShard.seqNoStats().getMaxSeqNo() ||
indexShard.indexSettings().getIndexVersionCreated().before(Version.V_7_1_0)
: "global checkpoint is not initialized [" + indexShard.seqNoStats() + "]";
return null;
});
}
Expand Down Expand Up @@ -382,7 +385,7 @@ public void receiveFileInfo(List<String> phase1FileNames,
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
Expand All @@ -395,10 +398,11 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
store.ensureIndexHasHistoryUUID();
}
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
assert globalCheckpoint >= Long.parseLong(sourceMetaData.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO))
|| indexShard.indexSettings().getIndexVersionCreated().before(Version.V_7_1_0) :
"invalid global checkpoint[" + globalCheckpoint + "] source_meta_data [" + sourceMetaData.getCommitUserData() + "]";
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);

if (indexShard.getRetentionLeases().leases().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ void receiveFileInfo(List<String> phase1FileNames,
/**
* After all source files has been sent over, this command is sent to the target so it can clean any local
* files that are not part of the source store
*
* @param totalTranslogOps an update number of translog operations that will be replayed later on
* @param sourceMetaData meta data of the source store
* @param globalCheckpoint the global checkpoint on the primary
* @param sourceMetaData meta data of the source store
*/
void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException;
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException;

/** writes a partial file chunk to the target store */
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps),
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public void run() {
Future<Void> future = shards.asyncRecoverReplica(replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, sourceMetaData);
public void cleanFiles(int totalTranslogOps, long globalCheckpoint,
Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
latch.countDown();
try {
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,9 @@ public void indexTranslogOperations(
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
blockIfNeeded(RecoveryState.Stage.INDEX);
super.cleanFiles(totalTranslogOps, sourceMetaData);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testWriteFileChunksConcurrently() throws Exception {
for (Thread sender : senders) {
sender.join();
}
recoveryTarget.cleanFiles(0, sourceSnapshot);
recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot);
recoveryTarget.decRef();
Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
between(1, 8)) {

@Override
public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
phase1Called.set(true);
return super.phase1(snapshot, translogOps);
return super.phase1(snapshot, globalCheckpoint, translogOps);
}

@Override
Expand Down Expand Up @@ -715,7 +715,7 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -42,9 +43,11 @@
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -323,7 +326,18 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
}
IndexShard replicaShard = newShard(primaryShard.shardId(), false);
updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
recoverReplica(replicaShard, primaryShard, true);
recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) {
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
assertThat(replicaShard.getGlobalCheckpoint(), equalTo(primaryShard.getGlobalCheckpoint()));
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
assertThat(globalCheckpoint, equalTo(primaryShard.getGlobalCheckpoint()));
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
}
}, true, true);
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, sourceMetaData);
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
}

@Override
Expand Down

0 comments on commit e9999df

Please sign in to comment.