Skip to content

Commit

Permalink
adjust fromTimestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 9, 2024
1 parent 4892f6a commit 8a8d3ee
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,20 @@ import org.slf4j.Logger
if (backtracking) latestBacktracking
else latest

def nextQueryFromTimestamp: Instant =
if (backtracking) latestBacktracking.timestamp
else latest.timestamp
def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant =
if (backtracking && latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp))
latest.timestamp.minus(backtrackingWindow)
else if (backtracking)
latestBacktracking.timestamp
else
latest.timestamp

def nextQueryFromSeqNr: Option[Long] =
if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking)
else highestSeenSeqNr(previous, latest)

def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
def nextQueryToTimestamp(backtrackingWindow: JDuration, atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp(backtrackingWindow), atLeastNumberOfEvents) match {
case Some(t) =>
if (backtracking)
if (t.isAfter(latest.timestamp)) Some(latest.timestamp) else Some(t)
Expand Down Expand Up @@ -253,7 +257,7 @@ import org.slf4j.Logger
val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)

val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
case None =>
Expand Down Expand Up @@ -478,9 +482,9 @@ import org.slf4j.Logger
if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
else settings.querySettings.behindCurrentTime

val fromTimestamp = newState.nextQueryFromTimestamp
val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
val fromSeqNr = newState.nextQueryFromSeqNr
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize)

if (log.isDebugEnabled()) {
val backtrackingInfo =
Expand Down

0 comments on commit 8a8d3ee

Please sign in to comment.