-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset replica engine to global checkpoint on promotion #33473
Conversation
When a replica starts following a newly promoted primary, it may have some operations which don't exist on the new primary. Thus we need to throw those operations to align a replica with the new primary. This can be done by resetting an engine from the safe commit, then replaying the local translog up to the global checkpoint.
Pinging @elastic/es-distributed |
This is the first part of #32867 (comment). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments. This looks much simpler than the overall one.
|
||
public boolean isRecovery() { | ||
return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY; | ||
} | ||
|
||
boolean isLocal() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we call it isRemote()
and then we don't need to invert the if statements in the Engine?
@@ -1163,11 +1157,16 @@ public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionTy | |||
PRIMARY, | |||
REPLICA, | |||
PEER_RECOVERY, | |||
LOCAL_TRANSLOG_RECOVERY; | |||
LOCAL_TRANSLOG_RECOVERY, | |||
LOCAL_RESETTING; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call this LOCAL_RESET
then it's consistent with RECOVERY
not a continuous form
} | ||
|
||
private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, | ||
Runnable onPerOperationRecovered) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just call it onOperationRecovered
?
} | ||
shard.close("test", false); | ||
} finally { | ||
IOUtils.close(shard.store()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put this into a try / with block?
translogRecoveryStats::incrementRecoveredOperations); | ||
} | ||
|
||
private int runTranslogRecoveryAfterResetting(Engine engine, Translog.Snapshot snapshot) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a javadoc comment what this does vs. the ordinary recovery. I also wonder if we should maybe only have this version int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, Runnable onPerOperationRecovered) throws IOException
and pass in closures where we actually call it. Since we really only call it from a single place. It would make this class a little less complex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dnhatn . I left a bunch of comments. Looking good.
@@ -833,7 +834,7 @@ public IndexResult index(Index index) throws IOException { | |||
indexResult = new IndexResult( | |||
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); | |||
} | |||
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { | |||
if (index.origin().isRemote()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename this to isTranslog
? then it will tie directly to what's happening in this code.
@@ -109,6 +109,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { | |||
* @param checkpoint the local checkpoint to reset this tracker to | |||
*/ | |||
public synchronized void resetCheckpoint(final long checkpoint) { | |||
// TODO: remove this method as we no longer need it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are we waiting on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have tests which verify that we restore the local checkpoint after resetting it to the global checkpoint. I decided to leave out this method in PR to minimize the changes. I will remove this method in the next PR.
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { | ||
recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); | ||
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); | ||
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some java docs to what to onOperationRecovered mean?
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); | ||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); | ||
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); | ||
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it feels weird to do these things here - this method now only creates an engine but doesn't change the IndexShard fields - imo it shouldn't touch the store (because it doesn't know anything about any current engine running it)
final Engine engine = engineFactory.newReadWriteEngine(config); | ||
onNewEngine(engine); | ||
engine.onSettingsChanged(); | ||
active.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment - it's weird this change the IndexShard active state without actually exposing the engine.
// - replica1 has {doc1} | ||
// - replica2 has {doc1, doc2} | ||
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery | ||
shards.assertAllEqual(initDocs + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w00t
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations); | ||
expectedOps.add(op2); | ||
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); | ||
List<Translog.Operation> operations = TestTranslog.drainAll(snapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we lost the check that initOperations are also part of the snapshot?
@@ -1879,13 +1873,16 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { | |||
SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); | |||
flushShard(shard); | |||
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); | |||
// Simulate resync (without rollback): Noop #1, index #2 | |||
acquireReplicaOperationPermitBlockingly(shard, shard.pendingPrimaryTerm + 1); | |||
// Here we try to simulate the primary fail-over without rollback which is no longer the case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this comment. Can you clarify please?
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id()); | ||
final Set<String> docsOnReplica; | ||
try { | ||
docsOnReplica = IndexShardTestCase.getShardDocUIDs(replicaShard); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it be a lot of work to check that the source, primary terms and seq# are also identical?
verifyNotClosed(); | ||
IOUtils.close(currentEngineReference.getAndSet(null)); | ||
trimUnsafeCommits(); | ||
newEngine = createNewEngine(newEngineConfig()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just wondering. Would it make sense to do the trimUnsafeCommits as part of the new engine creation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was before, but we prefer not to modify the Store implicitly (#33473 (comment)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
If a shard is empty, it won't rollback its engine on promotion. This commit adjusts the expectation in the rollback test. Relates #33473
* master: (43 commits) [HLRC][ML] Add ML put datafeed API to HLRC (elastic#33603) Update AWS SDK to 1.11.406 in repository-s3 (elastic#30723) Expose CCR stats to monitoring (elastic#33617) [Docs] Update match-query.asciidoc (elastic#33610) TEST: Adjust rollback condition when shard is empty [CCR] Improve shard follow task's retryable error handling (elastic#33371) Forbid negative `weight` in Function Score Query (elastic#33390) Clarify context suggestions filtering and boosting (elastic#33601) Disable CCR REST endpoints if CCR disabled (elastic#33619) Lower version on full cluster restart settings test Upgrade remote cluster settings (elastic#33537) NETWORKING: http.publish_host Should Contain CNAME (elastic#32806) Add test coverage for global checkpoint listeners Reset replica engine to global checkpoint on promotion (elastic#33473) HLRC: ML Delete Forecast API (elastic#33526) Remove debug logging in full cluster restart tests (elastic#33612) Expose CCR to the transport client (elastic#33608) Mute testIndexDeletionWhenNodeRejoins SQL: Make Literal a NamedExpression (elastic#33583) [DOCS] Adds missing built-in user information (elastic#33585) ...
* master: (128 commits) [HLRC][ML] Add ML put datafeed API to HLRC (elastic#33603) Update AWS SDK to 1.11.406 in repository-s3 (elastic#30723) Expose CCR stats to monitoring (elastic#33617) [Docs] Update match-query.asciidoc (elastic#33610) TEST: Adjust rollback condition when shard is empty [CCR] Improve shard follow task's retryable error handling (elastic#33371) Forbid negative `weight` in Function Score Query (elastic#33390) Clarify context suggestions filtering and boosting (elastic#33601) Disable CCR REST endpoints if CCR disabled (elastic#33619) Lower version on full cluster restart settings test Upgrade remote cluster settings (elastic#33537) NETWORKING: http.publish_host Should Contain CNAME (elastic#32806) Add test coverage for global checkpoint listeners Reset replica engine to global checkpoint on promotion (elastic#33473) HLRC: ML Delete Forecast API (elastic#33526) Remove debug logging in full cluster restart tests (elastic#33612) Expose CCR to the transport client (elastic#33608) Mute testIndexDeletionWhenNodeRejoins SQL: Make Literal a NamedExpression (elastic#33583) [DOCS] Adds missing built-in user information (elastic#33585) ...
When a replica starts following a newly promoted primary, it may have some operations which don't exist on the new primary. Thus we need to throw those operations to align a replica with the new primary. This can be done by first resetting an engine from the safe commit, then replaying the local translog up to the global checkpoint. Relates #32867
If a shard is empty, it won't rollback its engine on promotion. This commit adjusts the expectation in the rollback test. Relates #33473
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867
Today we use the version of a DirectoryReader as a component of the key of IndicesRequestCache. This usage is perfectly fine since the version is advanced every time a new change is made into IndexWriter. In other words, two DirectoryReader with the same version should have the same content. However, this invariant is only guaranteed in the context of a single IndexWriter because the version is reset to the committed version value when IndexWriter is re-opened. Since elastic#33473, each IndexShard may have more than one IndexWriter, and using the version of a DirectoryReader as a part of the cache key can cause IndicesRequestCache to return stale cached values. For example, in elastic#27650, we rollback the engine (i.e., re-open IndexWriter), index new documents, refresh, then make a count request, but the search layer mistakenly returns the count of the DirectoryReader of the previous IndexWriter because the current DirectoryReader has the same version to the old DirectoryReader. This is possible because these two readers come from different IndexWriters. This commit replaces the the version with the reader cache key of IndexReader as a component of the cache key of IndicesRequestCache. Closes elastic#27650 Relates elastic#33473
Today we use the version of a DirectoryReader as a component of the key of IndicesRequestCache. This usage is perfectly fine since the version is advanced every time a new change is made into IndexWriter. In other words, two DirectoryReaders with the same version should have the same content. However, this invariant is only guaranteed in the context of a single IndexWriter because the version is reset to the committed version value when IndexWriter is re-opened. Since #33473, each IndexShard may have more than one IndexWriter, and using the version of a DirectoryReader as a part of the cache key can cause IndicesRequestCache to return stale cached values. For example, in #27650, we rollback the engine (i.e., re-open IndexWriter), index new documents, refresh, then make a count request, but the search layer mistakenly returns the count of the DirectoryReader of the previous IndexWriter because the current DirectoryReader has the same version of the old DirectoryReader even their documents are different. This is possible because these two readers come from different IndexWriters. This commit replaces the the version with the reader cache key of IndexReader as a component of the cache key of IndicesRequestCache. Closes #27650 Relates #33473
Today we use the version of a DirectoryReader as a component of the key of IndicesRequestCache. This usage is perfectly fine since the version is advanced every time a new change is made into IndexWriter. In other words, two DirectoryReaders with the same version should have the same content. However, this invariant is only guaranteed in the context of a single IndexWriter because the version is reset to the committed version value when IndexWriter is re-opened. Since #33473, each IndexShard may have more than one IndexWriter, and using the version of a DirectoryReader as a part of the cache key can cause IndicesRequestCache to return stale cached values. For example, in #27650, we rollback the engine (i.e., re-open IndexWriter), index new documents, refresh, then make a count request, but the search layer mistakenly returns the count of the DirectoryReader of the previous IndexWriter because the current DirectoryReader has the same version of the old DirectoryReader even their documents are different. This is possible because these two readers come from different IndexWriters. This commit replaces the the version with the reader cache key of IndexReader as a component of the cache key of IndicesRequestCache. Closes #27650 Relates #33473
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867
Today we use the version of a DirectoryReader as a component of the key of IndicesRequestCache. This usage is perfectly fine since the version is advanced every time a new change is made into IndexWriter. In other words, two DirectoryReaders with the same version should have the same content. However, this invariant is only guaranteed in the context of a single IndexWriter because the version is reset to the committed version value when IndexWriter is re-opened. Since #33473, each IndexShard may have more than one IndexWriter, and using the version of a DirectoryReader as a part of the cache key can cause IndicesRequestCache to return stale cached values. For example, in #27650, we rollback the engine (i.e., re-open IndexWriter), index new documents, refresh, then make a count request, but the search layer mistakenly returns the count of the DirectoryReader of the previous IndexWriter because the current DirectoryReader has the same version of the old DirectoryReader even their documents are different. This is possible because these two readers come from different IndexWriters. This commit replaces the the version with the reader cache key of IndexReader as a component of the cache key of IndicesRequestCache. Closes #27650 Relates #33473
With Lucene rollback (#33473), we should never have more than one primary term for each sequence number. Therefore we don't have to sort by the primary term when reading soft-deletes.
With Lucene rollback (#33473), we should never have more than one primary term for each sequence number. Therefore we don't have to sort by the primary term when reading soft-deletes.
When a replica starts following a newly promoted primary, it may have
some operations which don't exist on the new primary. Thus we need to
throw those operations to align a replica with the new primary. This can
be done by first resetting an engine from the safe commit, then replaying
the local translog up to the global checkpoint.
Relates #32867