Skip to content

Commit

Permalink
unique heartbeat persistenceId
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 9, 2024
1 parent 57b5429 commit f125c73
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.r2dbc.query.scaladsl
import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap

import scala.annotation.tailrec
Expand Down Expand Up @@ -113,6 +114,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

// key is tuple of entity type and slice
private val heartbeatPersistenceIds = new ConcurrentHashMap[(String, Int), String]()
private val heartbeatUuid = UUID.randomUUID().toString
log.debug("Using heartbeat UUID [{}]", heartbeatUuid)

private def heartbeatPersistenceId(entityType: String, slice: Int): String = {
val key = entityType -> slice
Expand All @@ -128,7 +131,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

@tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = {
if (n < 1000000) {
val pid = PersistenceId.concat(entityType, s"_ht-$n")
// including a uuid to make sure it is not the same as any persistence id of the application
val pid = PersistenceId.concat(entityType, s"_hb-$heartbeatUuid-$n")
if (persistenceExt.sliceForPersistenceId(pid) == slice)
pid
else
Expand Down

0 comments on commit f125c73

Please sign in to comment.