Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset replica engine to global checkpoint on promotion #33473

Merged
merged 14 commits into from
Sep 12, 2018
13 changes: 6 additions & 7 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,6 @@ public CommitStats commitStats() {
*/
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;

/**
* Reset the local checkpoint in the tracker to the given local checkpoint
* @param localCheckpoint the new checkpoint to be set
*/
public abstract void resetLocalCheckpoint(long localCheckpoint);

/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
Expand Down Expand Up @@ -1163,11 +1157,16 @@ public enum Origin {
PRIMARY,
REPLICA,
PEER_RECOVERY,
LOCAL_TRANSLOG_RECOVERY;
LOCAL_TRANSLOG_RECOVERY,
LOCAL_RESET;

public boolean isRecovery() {
return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY;
}

boolean isRemote() {
return this == PRIMARY || this == REPLICA || this == PEER_RECOVERY;
}
}

public Origin origin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ private boolean canOptimizeAddDocument(Index index) {
: "version: " + index.version() + " type: " + index.versionType();
return true;
case LOCAL_TRANSLOG_RECOVERY:
case LOCAL_RESET:
assert index.isRetry();
return true; // allow to optimize in order to update the max safe time stamp
default:
Expand Down Expand Up @@ -833,7 +834,7 @@ public IndexResult index(Index index) throws IOException {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (index.origin().isRemote()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename this to isTranslog? then it will tie directly to what's happening in this code.

final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
Expand Down Expand Up @@ -1173,7 +1174,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
}
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (delete.origin().isRemote()) {
final Translog.Location location;
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Delete(delete, deleteResult));
Expand Down Expand Up @@ -1411,7 +1412,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
}
final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo());
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (noOp.origin().isRemote()) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
Expand Down Expand Up @@ -2340,11 +2341,6 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

@Override
public void resetLocalCheckpoint(long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
* @param checkpoint the local checkpoint to reset this tracker to
*/
public synchronized void resetCheckpoint(final long checkpoint) {
// TODO: remove this method as we no longer need it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are we waiting on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have tests which verify that we restore the local checkpoint after resetting it to the global checkpoint. I decided to leave out this method in PR to minimize the changes. I will remove this method in the next PR.

assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
Expand Down
122 changes: 60 additions & 62 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@
import java.util.stream.StreamSupport;

import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
Expand Down Expand Up @@ -1273,16 +1272,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
return result;
}

// package-private for testing
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some java docs to what to onOperationRecovered mean?

Runnable onOperationRecovered) throws IOException {
int opsRecovered = 0;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
Engine.Result result = applyTranslogOperation(operation, origin);
switch (result.getResultType()) {
case FAILURE:
throw result.getFailure();
Expand All @@ -1293,9 +1290,8 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
default:
throw new AssertionError("Unknown result type [" + result.getResultType() + "]");
}

opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
onOperationRecovered.run();
} catch (Exception e) {
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
Expand All @@ -1313,8 +1309,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations());
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

/**
Expand All @@ -1340,30 +1343,22 @@ private void innerOpenEngineAndTranslog() throws IOException {
}
}
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);

final EngineConfig config = newEngineConfig();

// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");

assertMaxUnsafeAutoIdInCommit();

final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());

createNewEngine(config);
verifyNotClosed();
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
assertSequenceNumbersInCommit();
final EngineConfig config = newEngineConfig();
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
synchronized (mutex) {
assert currentEngineReference.get() == null : "engine is initialized already";
currentEngineReference.set(createNewEngine(config));
}
assert assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}

Expand Down Expand Up @@ -1463,7 +1458,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
if (origin == Engine.Operation.Origin.PRIMARY) {
assert assertPrimaryMode();
} else {
assert origin == Engine.Operation.Origin.REPLICA;
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
assert assertReplicationTarget();
}
if (writeAllowedStates.contains(state) == false) {
Expand Down Expand Up @@ -2164,33 +2159,21 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
}
}

private Engine createNewEngine(EngineConfig config) {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
}
assert this.currentEngineReference.get() == null;
Engine engine = newEngine(config);
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen
// inside the callback are not visible. This one enforces happens-before
this.currentEngineReference.set(engine);
}

// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which
// settings changes could possibly have happened, so here we forcefully push any config changes to the new engine:
Engine engine = getEngineOrNull();

// engine could perhaps be null if we were e.g. concurrently closed:
if (engine != null) {
engine.onSettingsChanged();
}
private Engine createNewEngine(EngineConfig config) throws IOException {
assert Thread.holdsLock(mutex);
verifyNotClosed();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels weird to do these things here - this method now only creates an engine but doesn't change the IndexShard fields - imo it shouldn't touch the store (because it doesn't know anything about any current engine running it)

assertMaxUnsafeAutoIdInCommit();
final Engine engine = engineFactory.newReadWriteEngine(config);
onNewEngine(engine);
engine.onSettingsChanged();
active.set(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment - it's weird this change the IndexShard active state without actually exposing the engine.

return engine;
}

protected Engine newEngine(EngineConfig config) {
return engineFactory.newReadWriteEngine(config);
}

private static void persistMetadata(
final ShardPath shardPath,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -2314,19 +2297,14 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) {
localCheckpoint = NO_OPS_PERFORMED;
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
localCheckpoint = currentGlobalCheckpoint;
getEngine().rollTranslogGeneration();
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
opPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
});
}
}
Expand Down Expand Up @@ -2687,4 +2665,24 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
}
};
}

/**
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
sync(); // persist the global checkpoint to disk
final long globalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("resetting replica engine from max_seq_no [{}] to global checkpoint [{}]", maxSeqNo, globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be trace? we already have a info log message before.

final Engine resettingEngine;
synchronized (mutex) {
IOUtils.close(currentEngineReference.getAndSet(null));
resettingEngine = createNewEngine(newEngineConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we do this, why did we need to change how createNewEngine behaved (i.e., update currentEngineReference etc.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I copied this change from the previous PR where we have a transition to a read-only engine. I'll revert this change and re-introduce later.

currentEngineReference.set(resettingEngine);
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be good to have some kind of progress logs here (like log ever 10k ops or something) under debug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We intend to have a dedicate recovery stats for the reset.

runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {});
resettingEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
assertSeqNos();
assertSameDocIdsOnShards();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4087,7 +4087,7 @@ public void markSeqNoAsCompleted(long seqNo) {
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
final long resetLocalCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
actualEngine.resetLocalCheckpoint(resetLocalCheckpoint);
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
completedSeqNos.clear();
actualEngine.restoreLocalCheckpointFromTranslog();
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.shard.IndexShardTests;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -74,6 +75,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
Expand Down Expand Up @@ -520,17 +522,11 @@ public void testSeqNoCollision() throws Exception {
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
List<Translog.Operation> operations = TestTranslog.drainAll(snapshot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we lost the check that initOperations are also part of the snapshot?

assertThat(op2, isIn(operations));
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
}
// TODO: We should assert the content of shards in the ReplicationGroup.
// Without rollback replicas(current implementation), we don't have the same content across shards:
// - replica1 has {doc1}
// - replica2 has {doc1, doc2}
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery
shards.assertAllEqual(initDocs + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w00t

}
}

Expand Down
Loading