Skip to content

Commit

Permalink
still backtrack until after start offset
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 9, 2024
1 parent 8a8d3ee commit 1759a8a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,20 @@ import org.slf4j.Logger
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

val qSettings = settings.querySettings
val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp

def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
val aheadOfInitial =
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)
val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp
aheadOfInitial &&
previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow))
}

qSettings.backtrackingEnabled &&
!state.backtracking &&
state.latest != TimestampOffset.Zero &&
// no backtracking if far behind current wall clock time
previousTimestamp.isAfter(clock.instant().minus(firstBacktrackingQueryWindow)) &&
!disableBacktrackingWhenFarBehindCurrentWallClockTime &&
(newIdleCount >= 5 ||
state.rowCountSinceBacktracking + state.rowCount >= qSettings.bufferSize * 3 ||
JDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,64 @@ class EventsBySliceBacktrackingSpec
result1.cancel()
}

"still make initial backtracking until ahead of start offset" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val slice1 = query.sliceForPersistenceId(pid1)
val slice2 = query.sliceForPersistenceId(pid2)
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)

val startTime = InstantFactory.now().minusSeconds(60 * 60 * 24)

writeEvent(slice1, pid1, 1, startTime.plusMillis(1), "e1-1")
writeEvent(slice2, pid2, 1, startTime.plusMillis(2), "e2-1")
writeEvent(slice1, pid1, 2, startTime.plusMillis(3), "e1-2")
writeEvent(slice2, pid2, 2, startTime.plusMillis(4), "e2-2")

(3 to 10).foreach { n =>
writeEvent(slice1, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n")
writeEvent(slice2, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n")
}

def startQuery(offset: Offset): TestSubscriber.Probe[EventEnvelope[String]] =
query
.eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices - 1, offset)
.filterNot(EnvelopeOrigin.fromHeartbeat)
.runWith(sinkProbe)
.request(1000)

def expect(env: EventEnvelope[String], pid: String, seqNr: Long, eventOption: Option[String]): Offset = {
env.persistenceId shouldBe pid
env.sequenceNr shouldBe seqNr
if (eventOption.isEmpty)
env.source shouldBe EnvelopeOrigin.SourceBacktracking
else
env.source shouldBe EnvelopeOrigin.SourceQuery
env.eventOption shouldBe eventOption
env.offset
}

val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty))
// from backtracking
expect(result1.expectNext(), pid1, 1, None)
expect(result1.expectNext(), pid2, 1, None)
expect(result1.expectNext(), pid1, 2, None)
expect(result1.expectNext(), pid2, 2, None)

// from normal
(3 to 10).foreach { n =>
expect(result1.expectNext(), pid1, n, Some(s"e1-$n"))
expect(result1.expectNext(), pid2, n, Some(s"e2-$n"))
}
// no backtracking
result1.expectNoMessage()

result1.cancel()
}

"predict backtracking filtered events based on latest seen counts" in {
pendingIfMoreThanOneDataPartition()

Expand Down

0 comments on commit 1759a8a

Please sign in to comment.