diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index acfbd5c0..0902d43e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -424,14 +424,20 @@ import org.slf4j.Logger // the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow val qSettings = settings.querySettings - val previousTimestamp = - if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp + + def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = { + val aheadOfInitial = + initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp) + val previousTimestamp = + if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp + aheadOfInitial && + previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow)) + } qSettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero && - // no backtracking if far behind current wall clock time - previousTimestamp.isAfter(clock.instant().minus(firstBacktrackingQueryWindow)) && + !disableBacktrackingWhenFarBehindCurrentWallClockTime && (newIdleCount >= 5 || state.rowCountSinceBacktracking + state.rowCount >= qSettings.bufferSize * 3 || JDuration diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index db402dac..554d635a 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -303,6 +303,64 @@ class EventsBySliceBacktrackingSpec result1.cancel() } + "still make initial backtracking until ahead of start offset" in { + pendingIfMoreThanOneDataPartition() + + val entityType = nextEntityType() + val pid1 = nextPid(entityType) + val pid2 = nextPid(entityType) + val slice1 = query.sliceForPersistenceId(pid1) + val slice2 = query.sliceForPersistenceId(pid2) + val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem) + + val startTime = InstantFactory.now().minusSeconds(60 * 60 * 24) + + writeEvent(slice1, pid1, 1, startTime.plusMillis(1), "e1-1") + writeEvent(slice2, pid2, 1, startTime.plusMillis(2), "e2-1") + writeEvent(slice1, pid1, 2, startTime.plusMillis(3), "e1-2") + writeEvent(slice2, pid2, 2, startTime.plusMillis(4), "e2-2") + + (3 to 10).foreach { n => + writeEvent(slice1, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n") + writeEvent(slice2, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n") + } + + def startQuery(offset: Offset): TestSubscriber.Probe[EventEnvelope[String]] = + query + .eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices - 1, offset) + .filterNot(EnvelopeOrigin.fromHeartbeat) + .runWith(sinkProbe) + .request(1000) + + def expect(env: EventEnvelope[String], pid: String, seqNr: Long, eventOption: Option[String]): Offset = { + env.persistenceId shouldBe pid + env.sequenceNr shouldBe seqNr + if (eventOption.isEmpty) + env.source shouldBe EnvelopeOrigin.SourceBacktracking + else + env.source shouldBe EnvelopeOrigin.SourceQuery + env.eventOption shouldBe eventOption + env.offset + } + + val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty)) + // from backtracking + expect(result1.expectNext(), pid1, 1, None) + expect(result1.expectNext(), pid2, 1, None) + expect(result1.expectNext(), pid1, 2, None) + expect(result1.expectNext(), pid2, 2, None) + + // from normal + (3 to 10).foreach { n => + expect(result1.expectNext(), pid1, n, Some(s"e1-$n")) + expect(result1.expectNext(), pid2, n, Some(s"e2-$n")) + } + // no backtracking + result1.expectNoMessage() + + result1.cancel() + } + "predict backtracking filtered events based on latest seen counts" in { pendingIfMoreThanOneDataPartition()