Skip to content

Commit

Permalink
fix: don't trigger heartbeats from idle backtracking
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Dec 10, 2024
1 parent ab5ac4d commit fb44daf
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ import org.slf4j.Logger
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)
previousQueryWallClock = Instant.EPOCH,
idleCountBeforeHeartbeat = 0)
}

final case class QueryState(
Expand All @@ -69,7 +70,8 @@ import org.slf4j.Logger
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {
previousQueryWallClock: Instant,
idleCountBeforeHeartbeat: Long) {

def backtracking: Boolean = backtrackingCount > 0

Expand Down Expand Up @@ -453,6 +455,10 @@ import org.slf4j.Logger

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
val newIdleCountBeforeHeartbeat =
if (state.backtracking) state.idleCountBeforeHeartbeat
else if (state.rowCount == 0) state.idleCountBeforeHeartbeat + 1
else 0
// only start tracking query wall clock (for heartbeats) after initial backtracking query
val newQueryWallClock = if (state.latestBacktracking != TimestampOffset.Zero) clock.instant() else Instant.EPOCH
val newState =
Expand All @@ -473,7 +479,8 @@ import org.slf4j.Logger
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else if (switchFromBacktracking(state)) {
// switching from backtracking
state.copy(
Expand All @@ -483,7 +490,8 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else {
// continue
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
Expand All @@ -495,7 +503,8 @@ import org.slf4j.Logger
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
}

val behindCurrentTime =
Expand Down Expand Up @@ -553,7 +562,7 @@ import org.slf4j.Logger
}

def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
if (state.idleCountBeforeHeartbeat >= 2 && state.previousQueryWallClock != Instant.EPOCH) {
// using wall clock to measure duration since the start time (database timestamp) up to idle backtracking limit
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,16 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
env.sequenceNr)
Nil
} else {
if (log.isDebugEnabled()) {
if (latestBacktracking.isAfter(t.timestamp))
log.debug(
"Event from query for persistenceId [{}] seqNr [{}] timestamp [{}]" +
" was before latest timestamp from backtracking or heartbeat [{}].",
env.persistenceId,
env.sequenceNr,
t.timestamp,
latestBacktracking)
}
env :: Nil
}
case _ =>
Expand Down

0 comments on commit fb44daf

Please sign in to comment.