From fb44dafdbe793a459a2d19ae45d8e614b8f0ad72 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 29 Nov 2024 17:38:12 +1300 Subject: [PATCH] fix: don't trigger heartbeats from idle backtracking --- .../r2dbc/internal/BySliceQuery.scala | 21 +++++++++++++------ .../query/scaladsl/R2dbcReadJournal.scala | 10 +++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index fdc212a7..063b0b84 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -50,7 +50,8 @@ import org.slf4j.Logger startTimestamp = Instant.EPOCH, startWallClock = Instant.EPOCH, currentQueryWallClock = Instant.EPOCH, - previousQueryWallClock = Instant.EPOCH) + previousQueryWallClock = Instant.EPOCH, + idleCountBeforeHeartbeat = 0) } final case class QueryState( @@ -69,7 +70,8 @@ import org.slf4j.Logger startTimestamp: Instant, startWallClock: Instant, currentQueryWallClock: Instant, - previousQueryWallClock: Instant) { + previousQueryWallClock: Instant, + idleCountBeforeHeartbeat: Long) { def backtracking: Boolean = backtrackingCount > 0 @@ -453,6 +455,10 @@ import org.slf4j.Logger def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = { val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0 + val newIdleCountBeforeHeartbeat = + if (state.backtracking) state.idleCountBeforeHeartbeat + else if (state.rowCount == 0) state.idleCountBeforeHeartbeat + 1 + else 0 // only start tracking query wall clock (for heartbeats) after initial backtracking query val newQueryWallClock = if (state.latestBacktracking != TimestampOffset.Zero) clock.instant() else Instant.EPOCH val newState = @@ -473,7 +479,8 @@ import org.slf4j.Logger latestBacktracking = fromOffset, backtrackingExpectFiltered = state.latestBacktrackingSeenCount, currentQueryWallClock = newQueryWallClock, - previousQueryWallClock = state.currentQueryWallClock) + previousQueryWallClock = state.currentQueryWallClock, + idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat) } else if (switchFromBacktracking(state)) { // switching from backtracking state.copy( @@ -483,7 +490,8 @@ import org.slf4j.Logger idleCount = newIdleCount, backtrackingCount = 0, currentQueryWallClock = newQueryWallClock, - previousQueryWallClock = state.currentQueryWallClock) + previousQueryWallClock = state.currentQueryWallClock, + idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat) } else { // continue val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0 @@ -495,7 +503,8 @@ import org.slf4j.Logger backtrackingCount = newBacktrackingCount, backtrackingExpectFiltered = state.latestBacktrackingSeenCount, currentQueryWallClock = newQueryWallClock, - previousQueryWallClock = state.currentQueryWallClock) + previousQueryWallClock = state.currentQueryWallClock, + idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat) } val behindCurrentTime = @@ -553,7 +562,7 @@ import org.slf4j.Logger } def heartbeat(state: QueryState): Option[Envelope] = { - if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) { + if (state.idleCountBeforeHeartbeat >= 2 && state.previousQueryWallClock != Instant.EPOCH) { // using wall clock to measure duration since the start time (database timestamp) up to idle backtracking limit val timestamp = state.startTimestamp.plus( JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime))) diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 14618e0c..2edb7502 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -604,6 +604,16 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat env.sequenceNr) Nil } else { + if (log.isDebugEnabled()) { + if (latestBacktracking.isAfter(t.timestamp)) + log.debug( + "Event from query for persistenceId [{}] seqNr [{}] timestamp [{}]" + + " was before latest timestamp from backtracking or heartbeat [{}].", + env.persistenceId, + env.sequenceNr, + t.timestamp, + latestBacktracking) + } env :: Nil } case _ =>