Skip to content

Commit

Permalink
Restore local history from translog on promotion (#33616)
Browse files Browse the repository at this point in the history
If a shard was serving as a replica when another shard was promoted to
primary, then its Lucene index was reset to the global checkpoint.
However, if the new primary fails before the primary/replica resync
completes and we are now being promoted, we have to restore the reverted
operations by replaying the translog to avoid losing acknowledged writes.

Relates #33473
Relates #32867
  • Loading branch information
dnhatn authored and kcm committed Oct 30, 2018
1 parent cce7dda commit e3013a3
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ protected void doRun() throws Exception {
return future;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
public void testRecoveryWithConcurrentIndexing() throws Exception {
final String index = "recovery_with_concurrent_indexing";
Response response = client().performRequest(new Request("GET", "_nodes"));
Expand Down Expand Up @@ -184,7 +183,6 @@ private String getNodeId(Predicate<Version> versionPredicate) throws IOException
}


@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
public void testRelocationWithConcurrentIndexing() throws Exception {
final String index = "relocation_with_concurrent_indexing";
switch (CLUSTER_TYPE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,12 +1720,12 @@ public interface Warmer {
public abstract void deactivateThrottling();

/**
* Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary
* promotion.
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
*
* @throws IOException if an I/O exception occurred reading the translog
* @return the number of translog operations have been recovered
*/
public abstract void restoreLocalCheckpointFromTranslog() throws IOException;
public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,12 @@ protected int getRefCount(IndexSearcher reference) {
}

@Override
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
}
}
return translogRecoveryRunner.run(this, snapshot);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ public void rollTranslogGeneration() {
}

@Override
public void restoreLocalCheckpointFromTranslog() {
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) {
return 0;
}

@Override
Expand Down
23 changes: 12 additions & 11 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,17 +494,16 @@ public void updateShardState(final ShardRouting newRouting,
try {
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
* its local checkpoint tracker was reset during the primary term transition. In particular, the local
* checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint
* tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
* replaying the translog and marking any operations there are completed.
* If this shard was serving as a replica shard when another shard was promoted to primary then
* its Lucene index was reset during the primary term transition. In particular, the Lucene index
* on this shard was reset to the global checkpoint and the operations above the local checkpoint
* were reverted. If the other shard that was promoted to primary subsequently fails before the
* primary/replica re-sync completes successfully and we are now being promoted, we have to restore
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
*/
final Engine engine = getEngine();
engine.restoreLocalCheckpointFromTranslog();
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
Expand Down Expand Up @@ -1452,9 +1451,11 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
} else {
if (origin == Engine.Operation.Origin.PRIMARY) {
assert assertPrimaryMode();
} else {
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
} else if (origin == Engine.Operation.Origin.REPLICA) {
assert assertReplicationTarget();
} else {
assert origin == Engine.Operation.Origin.LOCAL_RESET;
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
}
if (writeAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -4047,56 +4048,52 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
}
}

public void testRestoreLocalCheckpointFromTranslog() throws IOException {
engine.close();
InternalEngine actualEngine = null;
try {
final Set<Long> completedSeqNos = new HashSet<>();
final BiFunction<Long, Long, LocalCheckpointTracker> supplier = (maxSeqNo, localCheckpoint) -> new LocalCheckpointTracker(
maxSeqNo,
localCheckpoint) {
@Override
public void markSeqNoAsCompleted(long seqNo) {
super.markSeqNoAsCompleted(seqNo);
completedSeqNos.add(seqNo);
}
};
trimUnsafeCommits(engine.config());
actualEngine = new InternalEngine(engine.config(), supplier);
final int operations = randomIntBetween(0, 1024);
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
for (int i = 0; i < operations; i++) {
if (rarely() && i < operations - 1) {
public void testRestoreLocalHistoryFromTranslog() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
final ArrayList<Long> seqNos = new ArrayList<>();
final int numOps = randomIntBetween(0, 1024);
for (int i = 0; i < numOps; i++) {
if (rarely()) {
continue;
}
expectedCompletedSeqNos.add((long) i);
seqNos.add((long) i);
}

final ArrayList<Long> seqNos = new ArrayList<>(expectedCompletedSeqNos);
Randomness.shuffle(seqNos);
for (final long seqNo : seqNos) {
final String id = Long.toString(seqNo);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
final Term uid = newUid(doc);
final long time = System.nanoTime();
actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false));
if (rarely()) {
actualEngine.rollTranslogGeneration();
final EngineConfig engineConfig;
final SeqNoStats prevSeqNoStats;
final List<DocIdSeqNoAndTerm> prevDocs;
final int totalTranslogOps;
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
engineConfig = engine.config();
for (final long seqNo : seqNos) {
final String id = Long.toString(seqNo);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1, seqNo, false));
if (rarely()) {
engine.rollTranslogGeneration();
}
if (rarely()) {
engine.flush();
}
}
}
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
final long resetLocalCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
completedSeqNos.clear();
actualEngine.restoreLocalCheckpointFromTranslog();
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
assertThat(completedSeqNos, equalTo(intersection));
assertThat(actualEngine.getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
assertThat(generateNewSeqNo(actualEngine), equalTo((long) operations));
} finally {
IOUtils.close(actualEngine);
globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getLocalCheckpoint()));
engine.syncTranslog();
prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
prevDocs = getDocIds(engine, true);
totalTranslogOps = engine.getTranslog().totalOperations();
}
trimUnsafeCommits(engineConfig);
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
engine.restoreLocalHistoryFromTranslog(translogHandler);
assertThat(getDocIds(engine, true), equalTo(prevDocs));
SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint()));
assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo()));
assertThat(engine.getTranslog().totalOperations(), equalTo(totalTranslogOps));
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,23 +896,17 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException {
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));

final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
final long globalCheckpointOnReplica = SequenceNumbers.UNASSIGNED_SEQ_NO;
randomIntBetween(
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");

final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));

final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.getPendingPrimaryTerm() + 1,
Expand Down Expand Up @@ -946,6 +940,7 @@ public void onFailure(Exception e) {
resyncLatch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
closeShard(indexShard, false);
}

Expand Down

0 comments on commit e3013a3

Please sign in to comment.