From db6e2f460c26f1608e7a82912110561b379a4130 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 3 Oct 2023 17:05:26 +0200 Subject: [PATCH] fix: Envelope source is different for durable state * NullPointerException in isPayloadDefined * regression when adding the starting from snapshots, i.e. this bug was not released in any final version (but in milestones) * for durable state we use null payload as the backtracking marker in the row, becuse we also have the deleted case, ugly but that goes all the way to Akka's UpdatedDurableState --- .../1.2.0-M5.backwards.excludes/isPayloadDefined.excludes | 6 ++++++ .../akka/persistence/r2dbc/internal/BySliceQuery.scala | 8 +++----- .../akka/persistence/r2dbc/internal/DurableStateDao.scala | 4 +++- .../akka/persistence/r2dbc/internal/JournalDao.scala | 3 ++- .../akka/persistence/r2dbc/internal/SnapshotDao.scala | 2 +- 5 files changed, 15 insertions(+), 8 deletions(-) create mode 100644 core/src/main/mima-filters/1.2.0-M5.backwards.excludes/isPayloadDefined.excludes diff --git a/core/src/main/mima-filters/1.2.0-M5.backwards.excludes/isPayloadDefined.excludes b/core/src/main/mima-filters/1.2.0-M5.backwards.excludes/isPayloadDefined.excludes new file mode 100644 index 00000000..216ec1f6 --- /dev/null +++ b/core/src/main/mima-filters/1.2.0-M5.backwards.excludes/isPayloadDefined.excludes @@ -0,0 +1,6 @@ +# Internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.isPayloadDefined") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.source") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.DurableStateDao#SerializedStateRow.isPayloadDefined") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.JournalDao#SerializedJournalRow.isPayloadDefined") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.SnapshotDao#SerializedSnapshotRow.isPayloadDefined") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 46fc887d..eaa88009 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -146,7 +146,7 @@ import org.slf4j.Logger def seqNr: Long def dbTimestamp: Instant def readDbTimestamp: Instant - def isPayloadDefined: Boolean + def source: String } trait Dao[SerializedRow] { @@ -243,8 +243,7 @@ import org.slf4j.Logger behindCurrentTime = Duration.Zero, backtracking = false) .filter { row => - val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking - filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source) + filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) } .via(deserializeAndAddOffset(state.latest))) } else { @@ -459,8 +458,7 @@ import org.slf4j.Logger behindCurrentTime, backtracking = newState.backtracking) .filter { row => - val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking - filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source) + filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) } .via(deserializeAndAddOffset(newState.currentOffset))) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index 9764bb94..ba3cbdd2 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -30,7 +30,9 @@ import scala.concurrent.Future tags: Set[String]) extends BySliceQuery.SerializedRow { override def seqNr: Long = revision - override def isPayloadDefined: Boolean = payload.isDefined + override def source: String = + // payload = null => lazy loaded for backtracking (ugly, but not worth changing UpdatedDurableState in Akka) + if (payload == null) EnvelopeOrigin.SourceBacktracking else EnvelopeOrigin.SourceQuery } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala index f2d8ea03..22b046ee 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala @@ -31,7 +31,8 @@ private[r2dbc] object JournalDao { tags: Set[String], metadata: Option[SerializedEventMetadata]) extends BySliceQuery.SerializedRow { - override def isPayloadDefined: Boolean = payload.isDefined + override def source: String = + if (payload.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala index 0dc27b7f..60b7445c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala @@ -31,7 +31,7 @@ private[r2dbc] object SnapshotDao { metadata: Option[SerializedSnapshotMetadata]) extends BySliceQuery.SerializedRow { override def readDbTimestamp: Instant = dbTimestamp - def isPayloadDefined: Boolean = true + override def source: String = EnvelopeOrigin.SourceQuery } final case class SerializedSnapshotMetadata(payload: Array[Byte], serializerId: Int, serializerManifest: String)