diff --git a/core/src/main/mima-filters/1.2.0-M5.backwards.excludes/isPayloadDefined.excludes b/core/src/main/mima-filters/1.2.0-M5.backwards.excludes/isPayloadDefined.excludes new file mode 100644 index 00000000..216ec1f6 --- /dev/null +++ b/core/src/main/mima-filters/1.2.0-M5.backwards.excludes/isPayloadDefined.excludes @@ -0,0 +1,6 @@ +# Internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.isPayloadDefined") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.source") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.DurableStateDao#SerializedStateRow.isPayloadDefined") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.JournalDao#SerializedJournalRow.isPayloadDefined") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.SnapshotDao#SerializedSnapshotRow.isPayloadDefined") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 46fc887d..eaa88009 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -146,7 +146,7 @@ import org.slf4j.Logger def seqNr: Long def dbTimestamp: Instant def readDbTimestamp: Instant - def isPayloadDefined: Boolean + def source: String } trait Dao[SerializedRow] { @@ -243,8 +243,7 @@ import org.slf4j.Logger behindCurrentTime = Duration.Zero, backtracking = false) .filter { row => - val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking - filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source) + filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) } .via(deserializeAndAddOffset(state.latest))) } else { @@ -459,8 +458,7 @@ import org.slf4j.Logger behindCurrentTime, backtracking = newState.backtracking) .filter { row => - val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking - filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source) + filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) } .via(deserializeAndAddOffset(newState.currentOffset))) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index 9764bb94..ba3cbdd2 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -30,7 +30,9 @@ import scala.concurrent.Future tags: Set[String]) extends BySliceQuery.SerializedRow { override def seqNr: Long = revision - override def isPayloadDefined: Boolean = payload.isDefined + override def source: String = + // payload = null => lazy loaded for backtracking (ugly, but not worth changing UpdatedDurableState in Akka) + if (payload == null) EnvelopeOrigin.SourceBacktracking else EnvelopeOrigin.SourceQuery } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala index f2d8ea03..22b046ee 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala @@ -31,7 +31,8 @@ private[r2dbc] object JournalDao { tags: Set[String], metadata: Option[SerializedEventMetadata]) extends BySliceQuery.SerializedRow { - override def isPayloadDefined: Boolean = payload.isDefined + override def source: String = + if (payload.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala index 0dc27b7f..60b7445c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala @@ -31,7 +31,7 @@ private[r2dbc] object SnapshotDao { metadata: Option[SerializedSnapshotMetadata]) extends BySliceQuery.SerializedRow { override def readDbTimestamp: Instant = dbTimestamp - def isPayloadDefined: Boolean = true + override def source: String = EnvelopeOrigin.SourceQuery } final case class SerializedSnapshotMetadata(payload: Array[Byte], serializerId: Int, serializerManifest: String)