Skip to content

Commit

Permalink
fix: Envelope source is different for durable state
Browse files Browse the repository at this point in the history
* NullPointerException in isPayloadDefined
* regression when adding the starting from snapshots, i.e.
  this bug was not released in any final version (but in milestones)
* for durable state we use null payload as the backtracking marker
  in the row, becuse we also have the deleted case, ugly but that goes all the
  way to Akka's UpdatedDurableState
  • Loading branch information
patriknw committed Oct 3, 2023
1 parent c09b151 commit db6e2f4
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.isPayloadDefined")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.source")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.DurableStateDao#SerializedStateRow.isPayloadDefined")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.JournalDao#SerializedJournalRow.isPayloadDefined")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.SnapshotDao#SerializedSnapshotRow.isPayloadDefined")
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ import org.slf4j.Logger
def seqNr: Long
def dbTimestamp: Instant
def readDbTimestamp: Instant
def isPayloadDefined: Boolean
def source: String
}

trait Dao[SerializedRow] {
Expand Down Expand Up @@ -243,8 +243,7 @@ import org.slf4j.Logger
behindCurrentTime = Duration.Zero,
backtracking = false)
.filter { row =>
val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source)
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(state.latest)))
} else {
Expand Down Expand Up @@ -459,8 +458,7 @@ import org.slf4j.Logger
behindCurrentTime,
backtracking = newState.backtracking)
.filter { row =>
val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source)
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(newState.currentOffset)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import scala.concurrent.Future
tags: Set[String])
extends BySliceQuery.SerializedRow {
override def seqNr: Long = revision
override def isPayloadDefined: Boolean = payload.isDefined
override def source: String =
// payload = null => lazy loaded for backtracking (ugly, but not worth changing UpdatedDurableState in Akka)
if (payload == null) EnvelopeOrigin.SourceBacktracking else EnvelopeOrigin.SourceQuery
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ private[r2dbc] object JournalDao {
tags: Set[String],
metadata: Option[SerializedEventMetadata])
extends BySliceQuery.SerializedRow {
override def isPayloadDefined: Boolean = payload.isDefined
override def source: String =
if (payload.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[r2dbc] object SnapshotDao {
metadata: Option[SerializedSnapshotMetadata])
extends BySliceQuery.SerializedRow {
override def readDbTimestamp: Instant = dbTimestamp
def isPayloadDefined: Boolean = true
override def source: String = EnvelopeOrigin.SourceQuery
}

final case class SerializedSnapshotMetadata(payload: Array[Byte], serializerId: Int, serializerManifest: String)
Expand Down

0 comments on commit db6e2f4

Please sign in to comment.