Skip to content

Commit

Permalink
feat: Accept PubSub events after idle
Browse files Browse the repository at this point in the history
* PubSub events are ignored if they are too far ahead of backtracking.
* That means that they will always be ignored after an idle period.
* This emits heartbeat events when the query is idle and thereby progress
  the backtracking timestamp in the PubSub filter.
  • Loading branch information
patriknw committed Sep 18, 2024
1 parent 45a2bc4 commit ac57782
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ import org.slf4j.Logger
dao: BySliceQuery.Dao[Row],
createEnvelope: (TimestampOffset, Row) => Envelope,
extractOffset: Envelope => TimestampOffset,
createHeartbeat: Instant => Option[Envelope],
settings: R2dbcSettings,
log: Logger)(implicit val ec: ExecutionContext) {
import BySliceQuery._
Expand Down Expand Up @@ -502,12 +503,22 @@ import org.slf4j.Logger
.via(deserializeAndAddOffset(newState.currentOffset)))
}

def heeartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1) {
val timestamp = state.latestBacktracking.timestamp.plusMillis(
settings.querySettings.refreshInterval.toMillis * state.idleCount)
createHeartbeat(timestamp)
} else
None
}

ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _))
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _),
heartbeat = heeartbeat)
}

private def beforeQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ private[r2dbc] object ContinuousQuery {
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery))
beforeQuery: S => Option[Future[S]] = (_: S) => None,
heartbeat: S => Option[T] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(
new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery, heartbeat))

private case object NextQuery

Expand Down Expand Up @@ -69,7 +71,8 @@ final private[r2dbc] class ContinuousQuery[S, T](
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]])
beforeQuery: S => Option[Future[S]],
heartbeat: S => Option[T])
extends GraphStage[SourceShape[T]] {
import ContinuousQuery._

Expand Down Expand Up @@ -150,8 +153,12 @@ final private[r2dbc] class ContinuousQuery[S, T](
next()
}
})
val sourceWithHeartbeat = heartbeat(newState) match {
case None => source
case Some(h) => Source.single(h).concat(source)
}
val graph = Source
.fromGraph(source)
.fromGraph(sourceWithHeartbeat)
.to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph)
sinkIn.pull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import akka.persistence.query.typed.EventEnvelope
val SourceBacktracking = "BT"
val SourcePubSub = "PS"
val SourceSnapshot = "SN"
val SourceHeartbeat = "HB"

def fromQuery(env: EventEnvelope[_]): Boolean =
env.source == SourceQuery
Expand All @@ -32,6 +33,9 @@ import akka.persistence.query.typed.EventEnvelope
def fromSnapshot(env: EventEnvelope[_]): Boolean =
env.source == SourceSnapshot

def fromHeartbeat(env: EventEnvelope[_]): Boolean =
env.source == SourceHeartbeat

def isFilteredEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => e.filtered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

val extractOffset: EventEnvelope[Any] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset]

new BySliceQuery(queryDao, createEnvelope, extractOffset, settings, log)(typedSystem.executionContext)
val createHeartbeat: Instant => Option[EventEnvelope[Any]] = { timestamp =>
Some(createEventEnvelopeHeartbeat(timestamp))
}

new BySliceQuery(queryDao, createEnvelope, extractOffset, createHeartbeat, settings, log)(
typedSystem.executionContext)
}

private def bySlice[Event]: BySliceQuery[SerializedJournalRow, EventEnvelope[Event]] =
Expand Down Expand Up @@ -149,6 +154,22 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
tags = row.tags)
}

def createEventEnvelopeHeartbeat(timestamp: Instant): EventEnvelope[Any] = {
val event: Any = 0 // FIXME could be populated with idle slices, represented as bitmap 0-1023
new EventEnvelope(
TimestampOffset(timestamp, Map.empty),
"", // FIXME persistenceId
1L,
eventOption = Some(event),
timestamp.toEpochMilli,
eventMetadata = None,
"", // FIXME entityType
0, // FIXME slice
filtered = true,
source = EnvelopeOrigin.SourceHeartbeat,
Set.empty)
}

private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope = {
val event = deserializePayload(row)
// note that it's not possible to filter out FilteredPayload here
Expand All @@ -168,7 +189,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

val extractOffset: EventEnvelope[Event] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset]

new BySliceQuery(snapshotDao, createEnvelope, extractOffset, settings, log)(typedSystem.executionContext)
val createHeartbeat: Instant => Option[EventEnvelope[Event]] = { timestamp =>
Some(createEventEnvelopeHeartbeat(timestamp).asInstanceOf[EventEnvelope[Event]])
}

new BySliceQuery(snapshotDao, createEnvelope, extractOffset, createHeartbeat, settings, log)(
typedSystem.executionContext)
}

private def createEnvelopeFromSnapshot[Snapshot, Event](
Expand Down Expand Up @@ -521,7 +547,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
env => {
env.offset match {
case t: TimestampOffset =>
if (EnvelopeOrigin.fromBacktracking(env)) {
if (EnvelopeOrigin.fromBacktracking(env) || EnvelopeOrigin.fromHeartbeat(env)) {
latestBacktracking = t.timestamp
env :: Nil
} else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking == Instant.EPOCH) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg

val extractOffset: DurableStateChange[A] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset]

new BySliceQuery(stateDao, createEnvelope, extractOffset, settings, log)(typedSystem.executionContext)
new BySliceQuery(stateDao, createEnvelope, extractOffset, createHeartbeat = _ => None, settings, log)(
typedSystem.executionContext)
}

override def getObject(persistenceId: String): Future[GetObjectResult[A]] = {
Expand Down

0 comments on commit ac57782

Please sign in to comment.