Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Skip backtracking when far behind #600

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internals
ProblemFilters.exclude[Problem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState*")
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,20 @@ import org.slf4j.Logger
if (backtracking) latestBacktracking
else latest

def nextQueryFromTimestamp: Instant =
if (backtracking) latestBacktracking.timestamp
else latest.timestamp
def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant =
if (backtracking && latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp))
latest.timestamp.minus(backtrackingWindow)
else if (backtracking)
latestBacktracking.timestamp
else
latest.timestamp

def nextQueryFromSeqNr: Option[Long] =
if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking)
else highestSeenSeqNr(previous, latest)

def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
def nextQueryToTimestamp(backtrackingWindow: JDuration, atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp(backtrackingWindow), atLeastNumberOfEvents) match {
case Some(t) =>
if (backtracking)
if (t.isAfter(latest.timestamp)) Some(latest.timestamp) else Some(t)
Expand Down Expand Up @@ -253,7 +257,7 @@ import org.slf4j.Logger
val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)

val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
case None =>
Expand Down Expand Up @@ -415,20 +419,36 @@ import org.slf4j.Logger
state.backtracking && state.rowCount < settings.querySettings.bufferSize - state.backtrackingExpectFiltered
}

def switchToBacktracking(state: QueryState, newIdleCount: Long): Boolean = {
// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

val qSettings = settings.querySettings

def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
val aheadOfInitial =
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)
val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp
aheadOfInitial &&
previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow))
}

qSettings.backtrackingEnabled &&
!state.backtracking &&
state.latest != TimestampOffset.Zero &&
!disableBacktrackingWhenFarBehindCurrentWallClockTime &&
(newIdleCount >= 5 ||
state.rowCountSinceBacktracking + state.rowCount >= qSettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)
}

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
val newState =
if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 ||
state.rowCountSinceBacktracking + state.rowCount >= settings.querySettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
// FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`

// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

if (switchToBacktracking(state, newIdleCount)) {
// switching to backtracking
val fromOffset =
if (state.latestBacktracking == TimestampOffset.Zero)
Expand All @@ -445,7 +465,7 @@ import org.slf4j.Logger
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
// switching from backtracking
state.copy(
rowCount = 0,
rowCountSinceBacktracking = 0,
Expand All @@ -468,9 +488,9 @@ import org.slf4j.Logger
if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
else settings.querySettings.behindCurrentTime

val fromTimestamp = newState.nextQueryFromTimestamp
val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
val fromSeqNr = newState.nextQueryFromSeqNr
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize)

if (log.isDebugEnabled()) {
val backtrackingInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import scala.jdk.DurationConverters._

object EventsBySliceBacktrackingSpec {
private val BufferSize = 10 // small buffer for testing
Expand Down Expand Up @@ -103,7 +104,7 @@ class EventsBySliceBacktrackingSpec
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)

// don't let behind-current-time be a reason for not finding events
val startTime = InstantFactory.now().minusSeconds(10 * 60)
val startTime = InstantFactory.now().minusSeconds(90)

writeEvent(slice1, pid1, 1L, startTime, "e1-1")
writeEvent(slice1, pid1, 2L, startTime.plusMillis(1), "e1-2")
Expand Down Expand Up @@ -195,7 +196,7 @@ class EventsBySliceBacktrackingSpec
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)

// don't let behind-current-time be a reason for not finding events
val startTime = InstantFactory.now().minusSeconds(10 * 60)
val startTime = InstantFactory.now().minusSeconds(90)

writeEvent(slice1, pid1, 1L, startTime, "e1-1")
writeEvent(slice1, pid1, 2L, startTime.plusMillis(2), "e1-2")
Expand Down Expand Up @@ -245,6 +246,121 @@ class EventsBySliceBacktrackingSpec
result2.cancel()
}

"skip backtracking when far behind current time" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val slice1 = query.sliceForPersistenceId(pid1)
val slice2 = query.sliceForPersistenceId(pid2)
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)

val startTime = InstantFactory.now().minusSeconds(60 * 60 * 24)

(1 to 100).foreach { n =>
writeEvent(slice1, pid1, n, startTime.plusSeconds(n).plusMillis(1), s"e1-$n")
writeEvent(slice2, pid2, n, startTime.plusSeconds(n).plusMillis(2), s"e2-$n")
}

def startQuery(offset: Offset): TestSubscriber.Probe[EventEnvelope[String]] =
query
.eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices - 1, offset)
.filterNot(EnvelopeOrigin.fromHeartbeat)
.runWith(sinkProbe)
.request(1000)

def expect(env: EventEnvelope[String], pid: String, seqNr: Long, eventOption: Option[String]): Offset = {
env.persistenceId shouldBe pid
env.sequenceNr shouldBe seqNr
if (eventOption.isEmpty)
env.source shouldBe EnvelopeOrigin.SourceBacktracking
else
env.source shouldBe EnvelopeOrigin.SourceQuery
env.eventOption shouldBe eventOption
env.offset
}

val result1 = startQuery(NoOffset)
(1 to 100).foreach { n =>
expect(result1.expectNext(), pid1, n, Some(s"e1-$n"))
expect(result1.expectNext(), pid2, n, Some(s"e2-$n"))
}
// no backtracking
result1.expectNoMessage()

val now = InstantFactory.now().minus(settings.querySettings.backtrackingBehindCurrentTime.toJava)
writeEvent(slice1, pid1, 101, now, "e1-101")
writeEvent(slice2, pid2, 101, now.plusMillis(1), "e2-101")

expect(result1.expectNext(), pid1, 101, Some("e1-101"))
expect(result1.expectNext(), pid2, 101, Some("e2-101"))

// backtracking events
expect(result1.expectNext(), pid1, 101, None)
expect(result1.expectNext(), pid2, 101, None)

result1.cancel()
}

"still make initial backtracking until ahead of start offset" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val slice1 = query.sliceForPersistenceId(pid1)
val slice2 = query.sliceForPersistenceId(pid2)
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)

val startTime = InstantFactory.now().minusSeconds(60 * 60 * 24)

writeEvent(slice1, pid1, 1, startTime.plusMillis(1), "e1-1")
writeEvent(slice2, pid2, 1, startTime.plusMillis(2), "e2-1")
writeEvent(slice1, pid1, 2, startTime.plusMillis(3), "e1-2")
writeEvent(slice2, pid2, 2, startTime.plusMillis(4), "e2-2")

(3 to 10).foreach { n =>
writeEvent(slice1, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n")
writeEvent(slice2, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n")
}

def startQuery(offset: Offset): TestSubscriber.Probe[EventEnvelope[String]] =
query
.eventsBySlices[String](entityType, 0, persistenceExt.numberOfSlices - 1, offset)
.filterNot(EnvelopeOrigin.fromHeartbeat)
.runWith(sinkProbe)
.request(1000)

def expect(env: EventEnvelope[String], pid: String, seqNr: Long, eventOption: Option[String]): Offset = {
env.persistenceId shouldBe pid
env.sequenceNr shouldBe seqNr
if (eventOption.isEmpty)
env.source shouldBe EnvelopeOrigin.SourceBacktracking
else
env.source shouldBe EnvelopeOrigin.SourceQuery
env.eventOption shouldBe eventOption
env.offset
}

val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty))
// from backtracking
expect(result1.expectNext(), pid1, 1, None)
expect(result1.expectNext(), pid2, 1, None)
expect(result1.expectNext(), pid1, 2, None)
expect(result1.expectNext(), pid2, 2, None)

// from normal
(3 to 10).foreach { n =>
expect(result1.expectNext(), pid1, n, Some(s"e1-$n"))
expect(result1.expectNext(), pid2, n, Some(s"e2-$n"))
}
// no backtracking
result1.expectNoMessage()

result1.cancel()
}

"predict backtracking filtered events based on latest seen counts" in {
pendingIfMoreThanOneDataPartition()

Expand All @@ -254,7 +370,11 @@ class EventsBySliceBacktrackingSpec
val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)

// use times in the past well outside behind-current-time
val timeZero = InstantFactory.now().truncatedTo(ChronoUnit.SECONDS).minusSeconds(10 * 60)
val timeZero = InstantFactory
.now()
.truncatedTo(ChronoUnit.SECONDS)
.minus(settings.querySettings.backtrackingBehindCurrentTime.toJava)
.minusSeconds(10)

// events around the buffer size (of 10) will share the same timestamp
// to test tracking of seen events that will be filtered on the next cycle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class EventsBySliceSpec
val slice3 = query.sliceForPersistenceId(pid3)
val slice4 = query.sliceForPersistenceId(pid4)
val slices = Seq(slice1, slice2, slice3, slice4)
val t1 = InstantFactory.now()
val t1 = InstantFactory.now().minusSeconds(10)
val t2 = t1.plusMillis(1)

writeEvent(slice1, pid1, 1L, t1, "A1")
Expand Down
Loading