Skip to content

Commit

Permalink
fix EventSourcedCleanupSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Jan 30, 2024
1 parent 15b7c58 commit aee3108
Showing 1 changed file with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.r2dbc.cleanup.scaladsl

import java.util.UUID

import scala.concurrent.duration._

import akka.Done
Expand Down Expand Up @@ -51,6 +53,19 @@ class EventSourcedCleanupSpec

override def typedSystem: ActorSystem[_] = system

// find two different persistenceIds that are both in the slice range 0-255 so that this test can run with
// 4 data partitions
private def pidsWithSliceLessThan256(entityType: String) = {
var pid1: PersistenceId = null
var pid2: PersistenceId = null
while (pid1 == pid2 || persistenceExt.sliceForPersistenceId(pid1.id) > 255 || persistenceExt
.sliceForPersistenceId(pid2.id) > 255) {
pid1 = PersistenceId(entityType, UUID.randomUUID().toString)
pid2 = PersistenceId(entityType, UUID.randomUUID().toString)
}
(pid1, pid2)
}

"EventSourcedCleanup" must {
"delete events for one persistenceId" in {
val ackProbe = createTestProbe[Done]()
Expand Down Expand Up @@ -377,9 +392,8 @@ class EventSourcedCleanupSpec
"delete events for slice before timestamp" in {
val ackProbe = createTestProbe[Done]()
val entityType = nextEntityType()
val pid1 = PersistenceId(entityType, "a")
val pid2 = PersistenceId(entityType, "b")
persistenceExt.sliceForPersistenceId(pid1.id) should not be persistenceExt.sliceForPersistenceId(pid2.id)

var (pid1, pid2) = pidsWithSliceLessThan256(entityType)

val p1 = spawn(Persister(pid1))
val p2 = spawn(Persister(pid2))
Expand All @@ -398,7 +412,7 @@ class EventSourcedCleanupSpec
PersistenceQuery(system).readJournalFor[CurrentEventsBySliceQuery](R2dbcReadJournal.Identifier)
val eventsBefore =
journalQuery
.currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset)
.currentEventsBySlices[Any](entityType, 0, 255, Offset.noOffset)
.runWith(Sink.seq)
.futureValue
eventsBefore.size shouldBe 10
Expand All @@ -412,7 +426,7 @@ class EventSourcedCleanupSpec

val eventsAfter =
journalQuery
.currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset)
.currentEventsBySlices[Any](entityType, 0, 255, Offset.noOffset)
.runWith(Sink.seq)
.futureValue
eventsAfter.count(_.persistenceId == pid1.id) shouldBe 5
Expand Down

0 comments on commit aee3108

Please sign in to comment.