Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Envelope source is different for durable state #464

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.isPayloadDefined")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#SerializedRow.source")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.DurableStateDao#SerializedStateRow.isPayloadDefined")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.JournalDao#SerializedJournalRow.isPayloadDefined")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.SnapshotDao#SerializedSnapshotRow.isPayloadDefined")
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ import org.slf4j.Logger
def seqNr: Long
def dbTimestamp: Instant
def readDbTimestamp: Instant
def isPayloadDefined: Boolean
def source: String
}

trait Dao[SerializedRow] {
Expand Down Expand Up @@ -243,8 +243,7 @@ import org.slf4j.Logger
behindCurrentTime = Duration.Zero,
backtracking = false)
.filter { row =>
val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source)
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(state.latest)))
} else {
Expand Down Expand Up @@ -459,8 +458,7 @@ import org.slf4j.Logger
behindCurrentTime,
backtracking = newState.backtracking)
.filter { row =>
val source = if (row.isPayloadDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the NPE was here

filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, source)
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(newState.currentOffset)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import scala.concurrent.Future
tags: Set[String])
extends BySliceQuery.SerializedRow {
override def seqNr: Long = revision
override def isPayloadDefined: Boolean = payload.isDefined
override def source: String =
// payload = null => lazy loaded for backtracking (ugly, but not worth changing UpdatedDurableState in Akka)
if (payload == null) EnvelopeOrigin.SourceBacktracking else EnvelopeOrigin.SourceQuery
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ private[r2dbc] object JournalDao {
tags: Set[String],
metadata: Option[SerializedEventMetadata])
extends BySliceQuery.SerializedRow {
override def isPayloadDefined: Boolean = payload.isDefined
override def source: String =
if (payload.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[r2dbc] object SnapshotDao {
metadata: Option[SerializedSnapshotMetadata])
extends BySliceQuery.SerializedRow {
override def readDbTimestamp: Instant = dbTimestamp
def isPayloadDefined: Boolean = true
override def source: String = EnvelopeOrigin.SourceQuery
}

final case class SerializedSnapshotMetadata(payload: Array[Byte], serializerId: Int, serializerManifest: String)
Expand Down