Skip to content

Commit

Permalink
Proper EnvelopeOrigin
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Oct 19, 2023
1 parent fd11c0c commit aa314c9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.persistence.query.typed.EventEnvelope
val SourceQuery = ""
val SourceBacktracking = "BT"
val SourcePubSub = "PS"
val SourceSnapshot = "SN"

def fromQuery(env: EventEnvelope[_]): Boolean =
env.source == SourceQuery
Expand All @@ -28,6 +29,9 @@ import akka.persistence.query.typed.EventEnvelope
def fromPubSub(env: EventEnvelope[_]): Boolean =
env.source == SourcePubSub

def fromSnapshot(env: EventEnvelope[_]): Boolean =
env.source == SourceSnapshot

def isFilteredEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => e.filtered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
row.entityType,
row.slice,
filtered = false,
// FIXME move constant to Akka/EnvelopeOrigin
source = "SN",
source = EnvelopeOrigin.SourceSnapshot,
tags = row.tags)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.persistence.r2dbc.query

import java.time.Instant

import akka.Done
import akka.NotUsed
import akka.actor.testkit.typed.scaladsl.LogCapturing
Expand All @@ -20,6 +19,7 @@ import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.EnvelopeOrigin
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.persistence.typed.PersistenceId
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -126,7 +126,7 @@ class EventsByPersistenceIdStartingFromSnapshotSpec

val evt17 = result.expectNext()
evt17.event shouldBe expectedSnapshotEvent(17)
evt17.source should ===("SN")
EnvelopeOrigin.fromSnapshot(evt17) shouldBe true
for (i <- 18 to 20) {
result.expectNext().event shouldBe s"e-$i"
}
Expand Down

0 comments on commit aa314c9

Please sign in to comment.