Skip to content

Commit

Permalink
fix: stuck queries when too many events with same timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 1, 2024
1 parent 6fc1f7e commit 61ac24d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ import org.slf4j.Logger
if (backtracking) latestBacktracking.timestamp
else latest.timestamp

def nextQueryFromSeqNr: Option[Long] =
if (backtracking) highestSeenSeqNr(latestBacktracking)
else highestSeenSeqNr(latest)

def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
case Some(t) =>
Expand All @@ -70,6 +74,10 @@ import org.slf4j.Logger
}
}

private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = {
if (offset.seen.isEmpty) None else Some(offset.seen.values.max)
}

object Buckets {
type EpochSeconds = Long
type Count = Long
Expand Down Expand Up @@ -157,6 +165,7 @@ import org.slf4j.Logger
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long], // for events with same timestamp as `fromTimestamp`
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedRow, NotUsed]
Expand Down Expand Up @@ -214,6 +223,9 @@ import org.slf4j.Logger
if (state.queryCount == 0L || state.rowCount > 0) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)

val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.latest)

val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
Expand All @@ -228,7 +240,7 @@ import org.slf4j.Logger
state.queryCount,
minSlice,
maxSlice,
state.latest.timestamp,
fromTimestamp,
toTimestamp,
state.rowCount)

Expand All @@ -238,7 +250,8 @@ import org.slf4j.Logger
entityType,
minSlice,
maxSlice,
state.latest.timestamp,
fromTimestamp,
fromSeqNr,
toTimestamp = Some(toTimestamp),
behindCurrentTime = Duration.Zero,
backtracking = false)
Expand Down Expand Up @@ -312,7 +325,10 @@ import org.slf4j.Logger
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1 else 1
if (offset.timestamp == state.latestBacktracking.timestamp &&
highestSeenSeqNr(offset) == highestSeenSeqNr(state.latestBacktracking))
state.latestBacktrackingSeenCount + 1
else 1

state.copy(
latestBacktracking = offset,
Expand Down Expand Up @@ -420,6 +436,7 @@ import org.slf4j.Logger
else settings.querySettings.behindCurrentTime

val fromTimestamp = newState.nextQueryFromTimestamp
val fromSeqNr = newState.nextQueryFromSeqNr
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -454,6 +471,7 @@ import org.slf4j.Logger
minSlice,
maxSlice,
fromTimestamp,
fromSeqNr,
toTimestamp,
behindCurrentTime,
backtracking = newState.backtracking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao])

override protected def eventsBySlicesRangeSql(
fromSeqNrParam: Boolean,
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
// not caching, too many combinations

def fromSeqNrParamCondition =
if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else ""

def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= ?" else ""

Expand All @@ -51,7 +55,7 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends
FROM ${journalTable(minSlice)}
WHERE entity_type = ?
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND db_timestamp >= ? $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedStateRow, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
"SELECT CURRENT_TIMESTAMP AS db_timestamp"

protected def eventsBySlicesRangeSql(
fromSeqNrParam: Boolean,
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
// not caching, too many combinations

def fromSeqNrParamCondition =
if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else ""

def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= ?" else ""

Expand All @@ -96,7 +100,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
FROM ${journalTable(minSlice)}
WHERE entity_type = ?
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND db_timestamp >= ? $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
Expand Down Expand Up @@ -209,16 +213,25 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
stmt: Statement,
entityType: String,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant]): Statement = {
stmt
.bind(0, entityType)
.bindTimestamp(1, fromTimestamp)
val index1 = 2
val index2 = fromSeqNr match {
case Some(seqNr) =>
stmt.bindTimestamp(index1, fromTimestamp)
stmt.bind(index1 + 1, seqNr)
index1 + 2
case None => index1
}
toTimestamp match {
case Some(until) =>
stmt.bindTimestamp(2, until)
stmt.bind(3, settings.querySettings.bufferSize)
stmt.bindTimestamp(index2, until)
stmt.bind(index2 + 1, settings.querySettings.bufferSize)
case None =>
stmt.bind(2, settings.querySettings.bufferSize)
stmt.bind(index2, settings.querySettings.bufferSize)
}
}

Expand All @@ -227,6 +240,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedJournalRow, NotUsed] = {
Expand All @@ -241,12 +255,13 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
val stmt = connection
.createStatement(
eventsBySlicesRangeSql(
fromSeqNrParam = fromSeqNr.isDefined,
toDbTimestampParam = toTimestamp.isDefined,
behindCurrentTime,
backtracking,
minSlice,
maxSlice))
bindEventsBySlicesRangeSql(stmt, entityType, fromTimestamp, toTimestamp)
bindEventsBySlicesRangeSql(stmt, entityType, fromTimestamp, fromSeqNr, toTimestamp)
},
row =>
if (backtracking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedSnapshotRow, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,17 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
}

override protected def eventsBySlicesRangeSql(
fromSeqNrParam: Boolean,
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
// not caching, too many combinations

def fromSeqNrParamCondition =
if (fromSeqNrParam) "AND (db_timestamp != @from OR seq_nr >= @fromSeqNr)" else ""

def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= @until" else ""

Expand All @@ -139,7 +143,7 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
FROM ${journalTable(minSlice)}
WHERE entity_type = @entityType
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= @from $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND db_timestamp >= @from $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = $sqlFalse
ORDER BY db_timestamp, seq_nr"""
}
Expand All @@ -148,11 +152,13 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
stmt: Statement,
entityType: String,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant]): Statement = {
stmt
.bind("@limit", settings.querySettings.bufferSize)
.bind("@entityType", entityType)
.bindTimestamp("@from", fromTimestamp)
fromSeqNr.foreach(seqNr => stmt.bind("@fromSeqNr", seqNr))
toTimestamp.foreach(timestamp => stmt.bindTimestamp("@until", timestamp))
stmt
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ class EventsBySliceSpec
assertFinished(result)
}

"handle more events with same timestamp than buffer size" in new Setup {
val queryWithSmallBuffer = PersistenceQuery(testKit.system) // buffer size = 4
.readJournalFor[R2dbcReadJournal]("akka.persistence.r2dbc-small-buffer.query")
persister ! PersistAll((1 to 10).map(i => s"e-$i").toList)
persister ! Ping(probe.ref)
probe.expectMessage(Done)
val result: TestSubscriber.Probe[EventEnvelope[String]] =
doQuery(entityType, slice, slice, NoOffset, queryWithSmallBuffer)
.runWith(sinkProbe)
.request(11)
for (i <- 1 to 10) {
result.expectNext().event shouldBe s"e-$i"
}
assertFinished(result)
}

"include metadata" in {
val probe = testKit.createTestProbe[Done]()
val entityType = nextEntityType()
Expand Down

0 comments on commit 61ac24d

Please sign in to comment.