From 8a8d3eea88fa571e17f61827210bc4836f11a8bc Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 23 Sep 2024 15:58:52 +0200 Subject: [PATCH] adjust fromTimestamp --- .../r2dbc/internal/BySliceQuery.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 24a147e5..acfbd5c0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -71,16 +71,20 @@ import org.slf4j.Logger if (backtracking) latestBacktracking else latest - def nextQueryFromTimestamp: Instant = - if (backtracking) latestBacktracking.timestamp - else latest.timestamp + def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant = + if (backtracking && latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp)) + latest.timestamp.minus(backtrackingWindow) + else if (backtracking) + latestBacktracking.timestamp + else + latest.timestamp def nextQueryFromSeqNr: Option[Long] = if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking) else highestSeenSeqNr(previous, latest) - def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = { - buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match { + def nextQueryToTimestamp(backtrackingWindow: JDuration, atLeastNumberOfEvents: Int): Option[Instant] = { + buckets.findTimeForLimit(nextQueryFromTimestamp(backtrackingWindow), atLeastNumberOfEvents) match { case Some(t) => if (backtracking) if (t.isAfter(latest.timestamp)) Some(latest.timestamp) else Some(t) @@ -253,7 +257,7 @@ import org.slf4j.Logger val fromTimestamp = state.latest.timestamp val fromSeqNr = highestSeenSeqNr(state.previous, state.latest) - val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match { + val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match { case Some(t) => if (t.isBefore(endTimestamp)) t else endTimestamp case None => @@ -478,9 +482,9 @@ import org.slf4j.Logger if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime else settings.querySettings.behindCurrentTime - val fromTimestamp = newState.nextQueryFromTimestamp + val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow) val fromSeqNr = newState.nextQueryFromSeqNr - val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) + val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) if (log.isDebugEnabled()) { val backtrackingInfo =