-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix local processed checkpoint update #2576
fix local processed checkpoint update #2576
Conversation
a6c6630
to
52bf531
Compare
❌ Gradle Check failure e1ff2b4c54a5e6a9b8d1e93424bf3ccaf02175bc |
❌ Gradle Check failure a6c66306313d64689f03b7168a7f2a345756af90 |
❌ Gradle Check failure 52bf531bb130251fd3caf79e75ffc6cd369c292a |
52bf531
to
6251b8e
Compare
❌ Gradle Check failure 6251b8effc07c20369d1d87f14a12db45471d1b7 |
assert Thread.holdsLock(this); | ||
advanceMaxSeqNo(seqNo); | ||
if ((seqNo > persistedCheckpoint.get()) || (seqNo <= processedCheckpoint.get())) { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A fair amount of this code is duplicated from the existing markSeqNo
method. Since the first check is simply a bounds check, we can write a shouldUpdateSeqNo
method that does this and returns a boolean to decide if further processing is necessary. This can be invoked by wrapper methods that perform the update by compareAndSet
(as below) or via updateCheckpoint
(as with the original method)
return; | ||
} | ||
try { | ||
processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be using updateCheckpoint
to ensure that the persistedSeqNo
is kept up to date?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to updateCheckpoint to ensure persistedSeqNo is up to date still takes place when markSeqNoAsPersisted is called. In the case of segrep, we update processedSeqNo just once at the end since indexing doesn't take place on the replica - we omit the call to updateCheckpoint since it checks that all seq numbers till that point are consecutively processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! I think we should capture this bit in the Javadoc for fastForwardProcessedSeqNo
:
In the case of segrep, we update processedSeqNo just once at the end since indexing doesn't take place on the replica - we omit the call to updateCheckpoint since it checks that all seq numbers till that point are consecutively processed.
tracker = createEmptyTracker(); | ||
} | ||
|
||
public void testSimpleSegrepPrimaryProcessed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should really be 3 separate tests since you're testing different use-cases
- Base case
- Idempotency
- Persisted-vs-processed being out-of-sync, checkpoint update for seg-rep, and finally idempotency again
* | ||
* @param seqNo the sequence number to mark as processed | ||
*/ | ||
public synchronized void segrepMarkSeqNoAsProcessed(final long seqNo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A suggestion here, can we name this after what it's doing (fast forwarding to seqNo) vs the replication strategy that invokes it?
❌ Gradle Check failure 0b1dc1ddfba991abb32bbded801fa125b12561c2 |
0b1dc1d
to
a428afd
Compare
❌ Gradle Check failure a428afd8d713c7ac2e86c62f475282cb1a71cbb2 |
bitSet.set(offset); | ||
if (seqNo == checkPoint.get() + 1) { | ||
updateCheckpoint(checkPoint, bitSetMap); | ||
if (segrep) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this segrep specific logic and it can be inside of fastForwardProcessedSeqNo
leaving this method as is. This class shouldn't care about segrep vs non segrep.
It also looks like waitForProcessedOpsToComplete
is called from tests only, I'm not sure we have a use case for it with this call path? If not then we won't need to wait/notify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the wait/notify and separated out the segrep specific logic
} | ||
|
||
private boolean shouldUpdateSeqNo(final long seqNo, boolean segrep, final AtomicLong checkpoint) { | ||
return !((seqNo <= checkpoint.get()) || (segrep && seqNo > persistedCheckpoint.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can put the check against the persisted checkpoint inside of fastForwardProcessedSeqNo
and avoid the segrep specific flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the logic in the check to avoid segrep specific flag
} | ||
|
||
private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap<CountedBitSet> bitSetMap) { | ||
/** | ||
* Updates the processed checkpoint to the provided sequence number if segment replication is enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - Since there's no check here for seg-rep, the wording of "if segment replication is enabled" is confusing. Rephrase to:
Updates the processed sequence checkpoint to the given value. This does not update the persisted checkpoint value. This method is only used for segment replication.
bitSet.set(offset); | ||
if (seqNo == checkPoint.get() + 1) { | ||
updateCheckpoint(checkPoint, bitSetMap); | ||
if (segrep) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize that this was done in response to my comment, but this may be too far in the direction of unification 😄
What do you think of the following structure?
- Instead of branching on this boolean value, have this method return the boolean from
shouldUpdateSeqNo
(or in-line that method's logic here). - Have a single method that executes the
else
part of this - bothmarkSeqNoAsPersisted
andmarkSeqNoAsProcessed
can invoke this method. This will cover the existing code flows. - Finally, move the
if
clause portion directly intofastForwardProcessedSeqNo
- this will cover the seg-rep code flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldUpdateSeqNo doesn't have the logic to return a boolean representing segrep, but followed the other suggestions to have separate code flows
markSeqNo(seqNo, processedCheckpoint, null, true); | ||
} | ||
|
||
private boolean shouldUpdateSeqNo(final long seqNo, boolean segrep, final AtomicLong checkpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @mch2 alluded to in an earlier comment - if you're having to write variables to model use-cases, then that's usually a sign of an overfit. Consider having this method signature be:
private boolean shouldUpdateSeqNo(final long seqNo, final AtomicLong lowerBound, @Nullable final AtomicLong upperBound)
Also see my other comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followed the signature suggested
return; | ||
} | ||
try { | ||
processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! I think we should capture this bit in the Javadoc for fastForwardProcessedSeqNo
:
In the case of segrep, we update processedSeqNo just once at the end since indexing doesn't take place on the replica - we omit the call to updateCheckpoint since it checks that all seq numbers till that point are consecutively processed.
Signed-off-by: Poojita Raj <[email protected]>
Signed-off-by: Poojita Raj <[email protected]>
a428afd
to
c35bac2
Compare
* fix local processed checkpoint update Signed-off-by: Poojita Raj <[email protected]> * separated tests + wrapper function Signed-off-by: Poojita Raj <[email protected]>
…t replication] (#2576) (#2883) * fix local processed checkpoint update (#2576) Signed-off-by: Poojita Raj <[email protected]> * separated tests + wrapper function Signed-off-by: Poojita Raj <[email protected]> * moved tests + compareAndSet change Signed-off-by: Poojita Raj <[email protected]>
Signed-off-by: Poojita Raj [email protected]
Description
For segment replication, previously the local processed checkpoint was not being updated. This is now fixed along with unit tests for the same.
Issues Resolved
Resolves #2358
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.