Skip to content

Commit

Permalink
Fix flaky test InternalEngineTests.testLastRefreshCheckpoint (#9365)
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala authored Aug 16, 2023
1 parent 8d153ab commit f971e5b
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2784,7 +2784,7 @@ public final long lastRefreshedCheckpoint() {
* Returns the current local checkpoint getting refreshed internally.
*/
public final long currentOngoingRefreshCheckpoint() {
return lastRefreshedCheckpointListener.pendingCheckpoint;
return lastRefreshedCheckpointListener.pendingCheckpoint.get();
}

private final Object refreshIfNeededMutex = new Object();
Expand All @@ -2804,29 +2804,33 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {

private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
volatile long pendingCheckpoint;
volatile AtomicLong pendingCheckpoint;

LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
this.pendingCheckpoint = initialLocalCheckpoint;
this.pendingCheckpoint = new AtomicLong(initialLocalCheckpoint);
}

@Override
public void beforeRefresh() {
// all changes until this point should be visible after refresh
pendingCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, localCheckpointTracker.getProcessedCheckpoint()));
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
updateRefreshedCheckpoint(pendingCheckpoint);
updateRefreshedCheckpoint(pendingCheckpoint.get());
}
}

void updateRefreshedCheckpoint(long checkpoint) {
refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
// This shouldn't be required ideally, but we're also invoking this method from refresh as of now.
// This change is added as safety check to ensure that our checkpoint values are consistent at all times.
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));

}
}

Expand Down

0 comments on commit f971e5b

Please sign in to comment.